kfifo (Circular Buffer)
Linux 커널의 범용 FIFO 큐 구현인 kfifo(원형 버퍼)의 설계 원리, lock-free 단일 생산자/소비자 패턴, API 사용법, 디바이스 드라이버와 로그 버퍼 활용 사례, 성능 최적화 기법을 종합적으로 다룹니다.
핵심 요약
- FIFO 순서 — First In First Out, 먼저 들어간 데이터가 먼저 나옵니다.
- 고정 크기 — 생성 시 크기가 결정되며, 반드시 2의 거듭제곱이어야 합니다.
- Lock-free — 단일 생산자/단일 소비자 구성에서 lock 없이 동작합니다.
- Wrap-around — 버퍼 끝에 도달하면 자동으로 처음으로 돌아갑니다.
- 빠른 연산 — in/out 인덱스를 모듈로 연산 대신 비트 마스킹으로 처리합니다.
단계별 이해
- 원형 버퍼 개념
선형 배열을 원형으로 사용하는 원리를 이해합니다. - 인덱스 관리
in/out 포인터가 어떻게 순환하는지 확인합니다. - Lock-free 조건
단일 생산자/소비자에서 왜 lock이 필요 없는지 학습합니다. - API 사용
kfifo_alloc, kfifo_in, kfifo_out 함수를 익힙니다.
개요 (Overview)
kfifo(kernel FIFO)는 커널이 제공하는 범용 원형 버퍼(circular buffer) 구현입니다.
<linux/kfifo.h>에 정의되어 있으며, 고정 크기 FIFO 큐를 효율적으로 관리합니다.
kfifo가 널리 사용되는 이유:
- Lock-free 설계 — 단일 생산자/단일 소비자(SPSC) 구성에서 spinlock이나 mutex 없이 안전하게 동작합니다.
- O(1) 연산 — 삽입(enqueue)과 제거(dequeue)가 상수 시간에 수행됩니다.
- 메모리 효율 — 고정 크기 배열 하나로 구현되어 오버헤드가 최소화됩니다.
- 캐시 친화 — 연속 메모리 공간을 사용하여 CPU 캐시 효율이 높습니다.
- 간단한 API — 할당, 삽입, 제거, 해제의 4가지 핵심 함수만으로 대부분의 작업이 가능합니다.
Lock-free 제약: kfifo의 lock-free 특성은 단일 생산자와 단일 소비자일 때만 보장됩니다. 다중 생산자(MPSC) 또는 다중 소비자(SPMC)가 필요하면 명시적인 동기화(spinlock)가 필요합니다.
원형 버퍼 원리 (Circular Buffer Concept)
원형 버퍼는 선형 배열을 원형처럼 사용하는 자료구조입니다. 두 개의 인덱스(in, out)를 유지합니다:
- in (head, write pointer): 다음 쓰기 위치
- out (tail, read pointer): 다음 읽기 위치
인덱스 관리 (Index Management)
kfifo의 핵심은 인덱스를 어떻게 효율적으로 관리하는가입니다:
/* 전통적인 원형 버퍼 (느림) */
unsigned int in = 0, out = 0;
#define SIZE 8
/* 쓰기 */
buffer[in % SIZE] = data;
in++;
/* 읽기 */
data = buffer[out % SIZE];
out++;
/* 문제: 모듈로(%) 연산은 느림! */
/* kfifo 방식 (빠름) - 크기가 2의 거듭제곱일 때 */
#define SIZE 8 /* 2^3 */
#define MASK (SIZE - 1) /* 0b111 */
/* 쓰기 */
buffer[in & MASK] = data; /* 비트 AND는 1사이클! */
in++;
/* 읽기 */
data = buffer[out & MASK];
out++;
/* in, out은 계속 증가하되, 실제 인덱스는 & MASK로 계산 */
왜 2의 거듭제곱? 크기가 2^n이면 index % size를 index & (size-1)로 대체할 수 있습니다. 모듈로 연산은 수십 사이클이 걸리지만, 비트 AND는 1사이클만에 끝납니다.
Wrap-around 인덱스 비트 레벨 동작
인덱스가 오버플로우될 때 비트 마스킹으로 어떻게 wrap-around되는지 시각화합니다:
kfifo API
struct kfifo 구조
/* include/linux/kfifo.h */
struct __kfifo {
unsigned int in; /* 쓰기 인덱스 (계속 증가) */
unsigned int out; /* 읽기 인덱스 (계속 증가) */
unsigned int mask; /* size - 1 (비트 마스크) */
unsigned int esize; /* 원소 크기 (바이트) */
void *data; /* 실제 버퍼 */
};
#define DECLARE_KFIFO(fifo, type, size) \
struct __kfifo fifo = { \
.in = 0, .out = 0, .mask = size - 1, \
.esize = sizeof(type), \
.data = (void *)array \
}
할당과 해제 (Allocation & Deallocation)
/* 동적 할당 */
int kfifo_alloc(struct kfifo *fifo, unsigned int size, gfp_t gfp_mask);
/* 사용 예 */
struct kfifo my_fifo;
int ret = kfifo_alloc(&my_fifo, 1024, GFP_KERNEL);
if (ret) {
pr_err("kfifo_alloc failed\n");
return ret;
}
/* 정적 할당 (컴파일 타임) */
DECLARE_KFIFO_PTR(my_fifo, int);
kfifo_alloc(&my_fifo, 128, GFP_KERNEL);
/* 해제 */
void kfifo_free(struct kfifo *fifo);
삽입과 제거 (Enqueue & Dequeue)
/* 데이터 삽입 (enqueue) */
unsigned int kfifo_in(struct kfifo *fifo, const void *buf,
unsigned int len);
/* 반환값: 실제로 삽입된 바이트 수 (버퍼 가득 차면 len보다 작을 수 있음) */
/* 데이터 제거 (dequeue) */
unsigned int kfifo_out(struct kfifo *fifo, void *buf,
unsigned int len);
/* 반환값: 실제로 읽은 바이트 수 (버퍼 비어있으면 len보다 작을 수 있음) */
/* Peek (읽지 않고 확인만) */
unsigned int kfifo_out_peek(struct kfifo *fifo, void *buf,
unsigned int len);
/* 사용 예 */
char data[64] = "Hello, kfifo!";
unsigned int ret;
ret = kfifo_in(&my_fifo, data, strlen(data));
pr_info("Wrote %u bytes\n", ret);
char output[64];
ret = kfifo_out(&my_fifo, output, sizeof(output));
pr_info("Read %u bytes: %s\n", ret, output);
상태 확인 (Query Operations)
/* 버퍼 크기 */
unsigned int kfifo_size(struct kfifo *fifo);
/* 사용 가능한 바이트 수 */
unsigned int kfifo_avail(struct kfifo *fifo);
/* 저장된 데이터 바이트 수 */
unsigned int kfifo_len(struct kfifo *fifo);
/* 비어있는가? */
bool kfifo_is_empty(struct kfifo *fifo);
/* 가득 찼는가? */
bool kfifo_is_full(struct kfifo *fifo);
/* 초기화 (데이터 모두 삭제) */
void kfifo_reset(struct kfifo *fifo);
Lock-free 원리 (Lock-Free Mechanism)
kfifo는 단일 생산자/단일 소비자 구성에서 lock 없이 안전하게 동작합니다. 핵심 원리:
- 독립적인 인덱스:
in은 producer만,out은 consumer만 수정합니다. - 메모리 배리어:
smp_wmb()와smp_rmb()로 순서를 보장합니다. - 단조 증가:
in과out은 절대 감소하지 않으므로 경합이 발생하지 않습니다.
/* lib/kfifo.c - kfifo_in 간략화 버전 */
unsigned int kfifo_in(struct __kfifo *fifo, const void *buf,
unsigned int len)
{
unsigned int l;
/* 사용 가능한 공간 계산 */
l = kfifo_unused(fifo);
if (len > l)
len = l;
/* 데이터 복사 */
__kfifo_in_data(fifo, buf, len, fifo->in);
/* 쓰기 배리어: 데이터 쓰기가 in 업데이트보다 먼저 보이도록 보장 */
smp_wmb();
/* in 인덱스 업데이트 (producer만 수정) */
fifo->in += len;
return len;
}
/* kfifo_out 간략화 버전 */
unsigned int kfifo_out(struct __kfifo *fifo, void *buf,
unsigned int len)
{
unsigned int l;
/* 사용 가능한 데이터 계산 */
l = fifo->in - fifo->out;
if (len > l)
len = l;
/* 읽기 배리어: in 읽기가 데이터 읽기보다 먼저 보이도록 보장 */
smp_rmb();
/* 데이터 복사 */
__kfifo_out_data(fifo, buf, len, fifo->out);
/* out 인덱스 업데이트 (consumer만 수정) */
fifo->out += len;
return len;
}
다중 생산자/소비자: 여러 CPU에서 동시에 kfifo_in()을 호출하거나, 여러 CPU에서 kfifo_out()을 호출하면 경합이 발생합니다. 이 경우 spinlock으로 보호해야 합니다.
메모리 배리어 동작 원리
Lock-free kfifo가 안전하게 동작하는 핵심은 메모리 배리어입니다. 다중 CPU 환경에서의 동작을 시각화합니다:
메모리 배리어의 역할: CPU는 성능을 위해 메모리 연산 순서를 재배치할 수 있습니다. 메모리 배리어는 특정 순서를 강제하여 다른 CPU에서 올바른 순서로 보이도록 보장합니다. lock은 배리어를 포함하지만, 배리어만 사용하면 훨씬 가볍습니다.
실사용 사례 (Real-World Usage)
디바이스 드라이버 버퍼링
문자 디바이스 드라이버에서 IRQ 핸들러와 read() 시스템 콜 사이의 버퍼:
/* drivers/tty/serial/serial_core.c 유사 패턴 */
struct uart_port {
struct kfifo rx_fifo;
/* ... */
};
/* IRQ 핸들러 (producer) */
static irqreturn_t uart_irq_handler(int irq, void *dev_id)
{
struct uart_port *port = dev_id;
char ch;
/* 하드웨어에서 문자 읽기 */
ch = readb(port->membase + UART_RX_REG);
/* kfifo에 저장 (lock-free!) */
kfifo_in(&port->rx_fifo, &ch, 1);
return IRQ_HANDLED;
}
/* read() 시스템 콜 (consumer) */
static ssize_t uart_read(struct file *file, char __user *buf,
size_t count, loff_t *ppos)
{
struct uart_port *port = file_private_data(file);
char tmp[128];
unsigned int copied;
/* kfifo에서 읽기 (lock-free!) */
copied = kfifo_out(&port->rx_fifo, tmp, min(count, sizeof(tmp)));
if (copy_to_user(buf, tmp, copied))
return -EFAULT;
return copied;
}
커널 로그 버퍼
/* kernel/printk/printk.c 유사 패턴 */
static DEFINE_KFIFO(printk_fifo, char, 1024 * 16);
int vprintk_emit(const char *fmt, va_list args)
{
char text[512];
int len;
len = vsnprintf(text, sizeof(text), fmt, args);
/* 로그 메시지를 kfifo에 저장 */
kfifo_in(&printk_fifo, text, len);
/* 콘솔 출력 등 */
console_unlock();
return len;
}
실습 예제 (Hands-On Example)
간단한 문자 디바이스 드라이버로 kfifo를 활용하는 예제입니다:
/* chardev_kfifo.c - kfifo 기반 문자 디바이스 */
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/kfifo.h>
#include <linux/uaccess.h>
#define DEVICE_NAME "kfifo_dev"
#define FIFO_SIZE 1024
static dev_t dev_num;
static struct cdev c_dev;
static struct class *dev_class;
static DEFINE_KFIFO(my_fifo, char, FIFO_SIZE);
static ssize_t dev_read(struct file *f, char __user *buf,
size_t len, loff_t *off)
{
char tmp[256];
unsigned int copied;
copied = kfifo_out(&my_fifo, tmp, min(len, sizeof(tmp)));
if (copied == 0)
return 0; /* EOF */
if (copy_to_user(buf, tmp, copied))
return -EFAULT;
pr_info("Read %u bytes\n", copied);
return copied;
}
static ssize_t dev_write(struct file *f, const char __user *buf,
size_t len, loff_t *off)
{
char tmp[256];
unsigned int copied;
size_t to_copy = min(len, sizeof(tmp));
if (copy_from_user(tmp, buf, to_copy))
return -EFAULT;
copied = kfifo_in(&my_fifo, tmp, to_copy);
pr_info("Wrote %u bytes (available: %u)\n",
copied, kfifo_avail(&my_fifo));
return copied;
}
static struct file_operations fops = {
.owner = THIS_MODULE,
.read = dev_read,
.write = dev_write,
};
static int __init chardev_init(void)
{
if (alloc_chrdev_region(&dev_num, 0, 1, DEVICE_NAME) < 0)
return -1;
cdev_init(&c_dev, &fops);
if (cdev_add(&c_dev, dev_num, 1) < 0) {
unregister_chrdev_region(dev_num, 1);
return -1;
}
dev_class = class_create(THIS_MODULE, DEVICE_NAME);
device_create(dev_class, NULL, dev_num, NULL, DEVICE_NAME);
pr_info("kfifo device registered (buffer=%u bytes)\n",
kfifo_size(&my_fifo));
return 0;
}
static void __exit chardev_exit(void)
{
device_destroy(dev_class, dev_num);
class_destroy(dev_class);
cdev_del(&c_dev);
unregister_chrdev_region(dev_num, 1);
pr_info("kfifo device unregistered\n");
}
module_init(chardev_init);
module_exit(chardev_exit);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("MINZKN");
MODULE_DESCRIPTION("kfifo character device example");
사용 예제
# 모듈 로드
sudo insmod chardev_kfifo.ko
# 디바이스 확인
ls -l /dev/kfifo_dev
# 데이터 쓰기
echo "Hello, kfifo!" > /dev/kfifo_dev
# 데이터 읽기
cat /dev/kfifo_dev
# 출력: Hello, kfifo!
# 커널 로그 확인
dmesg | tail
# kfifo device registered (buffer=1024 bytes)
# Wrote 14 bytes (available: 1010)
# Read 14 bytes
성능 최적화 (Performance Optimization)
| 기법 | 설명 | 효과 |
|---|---|---|
| 2의 거듭제곱 크기 | 모듈로 대신 비트 마스킹 사용 | 인덱스 계산 10x 빠름 |
| Lock-free SPSC | 단일 생산자/소비자에서 lock 제거 | 동기화 오버헤드 제거 |
| 메모리 배리어 | smp_wmb/smp_rmb로 순서 보장 | 락보다 저렴 |
| 연속 메모리 | 캐시 라인 활용 | 캐시 미스 감소 |
벤치마크: lock-free kfifo는 spinlock 기반 큐보다 약 3-5배 빠르며, 특히 고빈도 IRQ 환경에서 성능 차이가 극대화됩니다.
kfifo 동시성 검증 체크리스트
kfifo는 SPSC에서 lock-free가 가능하지만, MPMC 상황에서는 반드시 외부 동기화가 필요합니다. 사용 패턴을 먼저 고정한 뒤 API를 선택해야 안전합니다.
| 시나리오 | 권장 방식 | 주의점 |
|---|---|---|
| 단일 생산자 / 단일 소비자 | lock-free kfifo | 메모리 배리어 의미를 이해하고 사용 |
| 다중 생산자 또는 다중 소비자 | spinlock + kfifo | irq 컨텍스트면 irqsave 계열 사용 |
| 유저 공간 복사 포함 | kfifo_to_user/kfifo_from_user | 부분 복사/오류 처리 분기 점검 |
# 빈번한 오용 점검
git grep -n "kfifo_in\\|kfifo_out\\|kfifo_to_user\\|kfifo_from_user" -- "*.c"
git grep -n "spin_lock.*kfifo" -- "*.c"
Power-of-2 마스킹 최적화 심화
kfifo의 성능 핵심은 버퍼 크기를 2의 거듭제곱으로 강제하는 설계입니다. 이 제약 덕분에 비용이 큰 나눗셈/모듈로 연산을 비트 AND 한 번으로 대체할 수 있습니다.
2의 거듭제곱 검증 원리
커널은 size & (size - 1) == 0 패턴으로 2의 거듭제곱 여부를 판별합니다. 이 기법은 2의 거듭제곱이 항상 단 하나의 비트만 1인 점을 활용합니다.
커널 소스에서 이 검증이 어떻게 사용되는지 확인합니다:
/* include/linux/kfifo.h */
/**
* kfifo_alloc - 동적 kfifo 할당
* @fifo: 초기화할 fifo 포인터
* @size: 요청 크기 (2의 거듭제곱으로 반올림됨)
* @gfp_mask: GFP 플래그
*/
static inline int __kfifo_alloc(struct __kfifo *fifo,
unsigned int size, size_t esize, gfp_t gfp_mask)
{
/* 2의 거듭제곱으로 반올림 */
size = roundup_pow_of_two(size);
fifo->in = 0;
fifo->out = 0;
fifo->esize = esize;
/* mask = size - 1: 모든 인덱스 연산에 사용 */
fifo->mask = size - 1;
fifo->data = kmalloc_array(esize, size, gfp_mask);
if (!fifo->data)
return -ENOMEM;
return 0;
}
마스킹 인덱스 상세 동작
마스킹 연산의 핵심은 in과 out이 무한히 증가하도록 두고, 실제 배열 인덱스만 마스크로 계산하는 것입니다. 이 설계 덕분에 랩어라운드 판단이 불필요합니다.
/* include/linux/kfifo.h - roundup_pow_of_two 내부 */
/**
* roundup_pow_of_two - 주어진 값보다 크거나 같은 최소 2의 거듭제곱 반환
*
* 예: roundup_pow_of_two(5) = 8
* roundup_pow_of_two(8) = 8
* roundup_pow_of_two(9) = 16
*/
static inline unsigned long __roundup_pow_of_two(unsigned long n)
{
return 1UL << fls_long(n - 1);
}
/* fls_long: find last set bit (최상위 설정 비트 위치)
* fls_long(4) = 3 (bit 2), fls_long(7) = 3
* 1UL << 3 = 8
*/
Lock-free 정확성 증명
kfifo의 SPSC(단일 생산자/단일 소비자) lock-free 동작이 정확한 이유를 메모리 순서 관점에서 증명합니다.
핵심은 smp_wmb()와 smp_rmb()가 올바른 위치에서 관찰 순서를 보장한다는 점입니다.
메모리 순서와 SPSC 안전성
이 순서 보장이 왜 안전한지 형식적으로 정리합니다:
- 불변식 1:
in은 생산자만 수정하고,out은 소비자만 수정합니다 (변수 소유권 분리). - 불변식 2: 생산자는 데이터를 쓴 후에
smp_wmb()를 호출하고, 그 후에in을 증가시킵니다. - 불변식 3: 소비자는
in을 읽은 후에smp_rmb()를 호출하고, 그 후에 데이터를 읽습니다. - 결론: 소비자가 새로운
in값을 관찰했다면,wmb→rmb쌍에 의해 대응하는 데이터도 반드시 관찰 가능합니다. 따라서 stale 데이터 읽기가 불가능합니다.
다중 생산자 문제
두 생산자가 동시에 kfifo에 쓰려 하면 어떤 문제가 발생하는지 봅니다:
__kfifo_in()과 __kfifo_out()의 실제 구현에서 배리어 배치를 확인합니다:
/* lib/kfifo.c - __kfifo_in() 핵심 흐름 */
static void kfifo_copy_in(struct __kfifo *fifo,
const void *src, unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask; /* 실제 배열 인덱스 */
/* esize가 0이면 바이트 단위 */
if (esize != 0) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off); /* 끝까지 복사 가능한 양 */
memcpy(fifo->data + off, src, l); /* 첫 번째 조각 */
memcpy(fifo->data, src + l, len - l); /* wrap 부분 (있으면) */
}
unsigned int __kfifo_in(struct __kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int l;
l = kfifo_unused(fifo); /* 빈 공간 계산 */
if (len > l)
len = l; /* 빈 공간만큼만 쓰기 */
kfifo_copy_in(fifo, buf, len, fifo->in);
/* ★ 핵심: 데이터 쓰기 완료 후 write barrier */
smp_wmb();
/* 그 후에 in 갱신 → 소비자가 새 데이터를 볼 수 있음 */
fifo->in += len;
return len;
}
unsigned int __kfifo_out(struct __kfifo *fifo,
void *buf, unsigned int len)
{
unsigned int l;
l = fifo->in - fifo->out; /* 사용 가능한 데이터 */
if (len > l)
len = l;
/* ★ 핵심: in 읽기 후 read barrier */
smp_rmb();
kfifo_copy_out(fifo, buf, len, fifo->out);
/* 데이터 읽기 완료 후 write barrier */
smp_wmb();
/* 그 후에 out 갱신 → 생산자가 공간 회수 가능 */
fifo->out += len;
return len;
}
smp_wmb()는 반드시 대응하는 smp_rmb()와 쌍을 이루어야 합니다.
생산자의 wmb(데이터→in)가 소비자의 rmb(in→데이터)와 짝을 이루어 크로스-CPU 가시성을 보장합니다.
배리어 하나만 있으면 다른 CPU에서 순서 재배치가 발생할 수 있습니다.
kfifo 내부 구조 분석
kfifo의 핵심 데이터 구조인 struct __kfifo의 각 필드와 헬퍼 함수들이 어떻게 동작하는지 상세히 분석합니다.
구조체 레이아웃
/* include/linux/kfifo.h */
struct __kfifo {
unsigned int in; /* 쓰기 오프셋 (생산자만 수정) */
unsigned int out; /* 읽기 오프셋 (소비자만 수정) */
unsigned int mask; /* size - 1, 인덱스 마스킹용 */
unsigned int esize; /* 요소 크기 (바이트), 0이면 바이트 스트림 */
void *data; /* 실제 데이터 버퍼 포인터 */
};
unsigned 산술과 래핑 정확성
in과 out이 unsigned int 범위(0 ~ 232-1)를 넘어 오버플로우해도 올바르게 동작하는 이유를 분석합니다.
/* include/linux/kfifo.h - 헬퍼 함수들 */
/* 현재 저장된 데이터 개수 */
static inline unsigned int kfifo_len(struct __kfifo *fifo)
{
return fifo->in - fifo->out; /* unsigned 뺄셈 → 자동 wrap */
}
/* 남은 빈 공간 */
static inline unsigned int kfifo_avail(struct __kfifo *fifo)
{
return kfifo_size(fifo) - kfifo_len(fifo);
}
/* 비어 있는가? */
static inline bool kfifo_is_empty(struct __kfifo *fifo)
{
return fifo->in == fifo->out; /* 같으면 비어 있음 */
}
/* 가득 찼는가? */
static inline bool kfifo_is_full(struct __kfifo *fifo)
{
return kfifo_len(fifo) > fifo->mask;
/* mask = size-1 이므로, len > mask ↔ len >= size */
}
/* 전체 크기 */
static inline unsigned int kfifo_size(struct __kfifo *fifo)
{
return fifo->mask + 1;
}
in/out을 리셋하지 않는 이유는 단순합니다.
리셋에는 양쪽(생산자/소비자) 모두의 동의가 필요하고, 이는 동기화를 요구합니다.
unsigned 오버플로우의 수학적 성질을 활용하면 리셋 없이도 무한히 동작할 수 있습니다.
DMA scatter/gather 연동
kfifo 데이터를 DMA 엔진으로 직접 전송할 때, 버퍼가 원형으로 wrap될 수 있으므로 scatter/gather 리스트가 필요합니다. 커널은 이를 위해 전용 DMA 헬퍼 API를 제공합니다.
DMA 영역 분할 문제
DMA 헬퍼 API
/* include/linux/kfifo.h - DMA 관련 매크로/함수 */
/**
* kfifo_dma_in_prepare - DMA 입력을 위한 scatterlist 준비
* @fifo: kfifo 포인터
* @sgl: scatterlist 배열 (최소 2개 엔트리)
* @nents: scatterlist 엔트리 수
* @len: 전송할 데이터 길이
*
* 반환: 실제 사용된 scatterlist 엔트리 수 (1 또는 2)
*/
#define kfifo_dma_in_prepare(fifo, sgl, nents, len) \
({ \
typeof((fifo) + 1) __tmp = (fifo); \
struct scatterlist *__sgl = (sgl); \
unsigned int __len = (len); \
const size_t __recsize = sizeof(*__tmp->type); \
struct __kfifo *__kfifo = &__tmp->kfifo; \
(__recsize) ? \
__kfifo_dma_in_prepare_r(__kfifo, __sgl, nents, __len, __recsize) : \
__kfifo_dma_in_prepare(__kfifo, __sgl, nents, __len); \
})
/**
* kfifo_dma_in_finish - DMA 입력 완료 통지
* @fifo: kfifo 포인터
* @len: 실제 DMA로 전송된 바이트 수
*
* DMA 전송 완료 후 호출하여 in 포인터를 갱신
*/
#define kfifo_dma_in_finish(fifo, len) \
__kfifo_dma_in_finish(&(fifo)->kfifo, len)
/**
* kfifo_dma_out_prepare - DMA 출력을 위한 scatterlist 준비
* kfifo_dma_out_finish - DMA 출력 완료 통지 (out 갱신)
*/
실제 디바이스 드라이버에서의 DMA + kfifo 통합 예제:
/* DMA scatter/gather로 kfifo에서 디바이스로 전송 */
static int my_dma_tx(struct my_device *dev)
{
struct scatterlist sg[2]; /* 최대 2개 (wrap 가능) */
unsigned int nents;
struct dma_async_tx_descriptor *desc;
sg_init_table(sg, 2);
/* kfifo에서 DMA용 scatterlist 준비 */
nents = kfifo_dma_out_prepare(&dev->tx_fifo, sg, 2,
kfifo_len(&dev->tx_fifo));
if (!nents)
return 0; /* 전송할 데이터 없음 */
/* scatterlist를 DMA 매핑 */
nents = dma_map_sg(dev->dev, sg, nents, DMA_TO_DEVICE);
/* DMA 디스크립터 생성 */
desc = dmaengine_prep_slave_sg(dev->dma_chan, sg, nents,
DMA_MEM_TO_DEV,
DMA_PREP_INTERRUPT);
desc->callback = my_dma_tx_complete;
desc->callback_param = dev;
/* DMA 전송 시작 */
dmaengine_submit(desc);
dma_async_issue_pending(dev->dma_chan);
return 0;
}
/* DMA 완료 콜백 */
static void my_dma_tx_complete(void *param)
{
struct my_device *dev = param;
unsigned int len = kfifo_len(&dev->tx_fifo);
/* DMA 완료 후 kfifo out 포인터 갱신 */
kfifo_dma_out_finish(&dev->tx_fifo, len);
/* DMA 매핑 해제 */
dma_unmap_sg(dev->dev, dev->sg, 2, DMA_TO_DEVICE);
}
kfifo_dma_in_prepare()와 kfifo_dma_in_finish() 사이에
kfifo에 직접 데이터를 쓰면 안 됩니다. DMA 엔진이 물리 주소를 기반으로 전송하므로,
중간에 데이터가 변경되면 일관성이 깨집니다. 반드시 finish() 호출 후에 다음 작업을 시작하세요.
레코드 모드 (Record FIFO)
기본 kfifo는 고정 크기 요소만 처리합니다. 가변 길이 데이터(로그 메시지, 네트워크 패킷 등)를 위해 커널은 레코드 모드를 제공합니다. 각 레코드 앞에 길이 접두사를 붙여 경계를 구분합니다.
레코드 레이아웃
/* include/linux/kfifo.h - 레코드 모드 선언 */
/* 1바이트 레코드 헤더: 레코드 최대 255바이트 */
struct my_rec1_fifo STRUCT_KFIFO_REC_1(256);
/* 2바이트 레코드 헤더: 레코드 최대 65535바이트 */
struct my_rec2_fifo STRUCT_KFIFO_REC_2(4096);
/* 초기화 */
INIT_KFIFO(my_fifo);
/* 레코드 넣기: 길이 접두사가 자동으로 추가됨 */
struct log_entry {
u64 timestamp;
u32 level;
char msg[200];
};
struct log_entry entry = {
.timestamp = ktime_get_ns(),
.level = KERN_WARNING,
.msg = "디스크 I/O 지연 감지",
};
/* kfifo_in은 sizeof(entry) 바이트 + 길이 접두사를 저장 */
kfifo_in(&my_fifo, &entry, sizeof(entry));
/* 짧은 메시지도 같은 fifo에 저장 가능 (가변 길이!) */
struct log_entry short_entry = {
.timestamp = ktime_get_ns(),
.level = KERN_INFO,
.msg = "OK",
};
kfifo_in(&my_fifo, &short_entry, offsetof(struct log_entry, msg) + 3);
/* 레코드 꺼내기: 저장된 길이만큼만 복사 */
struct log_entry out_entry;
unsigned int len = kfifo_out(&my_fifo, &out_entry, sizeof(out_entry));
/* len = 실제 레코드 크기 (접두사 제외) */
/* 다음 레코드 크기 미리 확인 (꺼내지 않고) */
unsigned int next_len = kfifo_peek_len(&my_fifo);
- 커널 로그 버퍼: 가변 길이 로그 메시지 저장
- Netlink 메시지 큐잉: 크기가 다른 netlink 메시지 버퍼링
- USB 요청 블록: 가변 크기 URB 데이터 임시 저장
- 오디오 프레임: 다양한 크기의 오디오 패킷 버퍼링
문자 디바이스 드라이버 통합
kfifo는 문자 디바이스 드라이버에서 사용자 공간과 커널 공간 사이의 데이터 교환 버퍼로 자주 사용됩니다.
kfifo_to_user()와 kfifo_from_user()가 copy_to_user()/copy_from_user()를 내부적으로 처리합니다.
데이터 흐름
완전한 문자 디바이스 드라이버 예제
#include <linux/module.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/kfifo.h>
#include <linux/poll.h>
#include <linux/wait.h>
#define FIFO_SIZE 1024 /* 2의 거듭제곱 */
#define DEV_NAME "kfifo_example"
static struct cdev my_cdev;
static dev_t dev_num;
static struct class *dev_class;
/* 정적 kfifo 선언 (바이트 스트림 모드) */
static DECLARE_KFIFO(my_fifo, unsigned char, FIFO_SIZE);
/* 읽기 대기 큐 */
static DECLARE_WAIT_QUEUE_HEAD(read_wq);
static DECLARE_WAIT_QUEUE_HEAD(write_wq);
/* 다중 접근 보호용 뮤텍스 */
static DEFINE_MUTEX(fifo_lock);
static ssize_t my_read(struct file *file,
char __user *ubuf, size_t count, loff_t *ppos)
{
int ret;
unsigned int copied;
/* non-blocking 모드에서 비어있으면 즉시 반환 */
if (kfifo_is_empty(&my_fifo)) {
if (file->f_flags & O_NONBLOCK)
return -EAGAIN;
/* blocking 모드: 데이터가 올 때까지 대기 */
ret = wait_event_interruptible(read_wq,
!kfifo_is_empty(&my_fifo));
if (ret)
return ret;
}
mutex_lock(&fifo_lock);
ret = kfifo_to_user(&my_fifo, ubuf, count, &copied);
mutex_unlock(&fifo_lock);
if (ret)
return ret;
/* 공간이 생겼으므로 write 대기자 깨우기 */
if (!kfifo_is_full(&my_fifo))
wake_up_interruptible(&write_wq);
return copied;
}
static ssize_t my_write(struct file *file,
const char __user *ubuf, size_t count, loff_t *ppos)
{
int ret;
unsigned int copied;
if (kfifo_is_full(&my_fifo)) {
if (file->f_flags & O_NONBLOCK)
return -EAGAIN;
ret = wait_event_interruptible(write_wq,
!kfifo_is_full(&my_fifo));
if (ret)
return ret;
}
mutex_lock(&fifo_lock);
ret = kfifo_from_user(&my_fifo, ubuf, count, &copied);
mutex_unlock(&fifo_lock);
if (ret)
return ret;
/* 데이터가 들어왔으므로 read 대기자 깨우기 */
if (!kfifo_is_empty(&my_fifo))
wake_up_interruptible(&read_wq);
return copied;
}
static __poll_t my_poll(struct file *file,
struct poll_table_struct *wait)
{
__poll_t mask = 0;
poll_wait(file, &read_wq, wait);
poll_wait(file, &write_wq, wait);
if (!kfifo_is_empty(&my_fifo))
mask |= EPOLLIN | EPOLLRDNORM; /* 읽기 가능 */
if (!kfifo_is_full(&my_fifo))
mask |= EPOLLOUT | EPOLLWRNORM; /* 쓰기 가능 */
return mask;
}
static const struct file_operations my_fops = {
.owner = THIS_MODULE,
.read = my_read,
.write = my_write,
.poll = my_poll,
.llseek = noop_llseek,
};
static int __init kfifo_dev_init(void)
{
int ret;
INIT_KFIFO(my_fifo);
ret = alloc_chrdev_region(&dev_num, 0, 1, DEV_NAME);
if (ret < 0)
return ret;
cdev_init(&my_cdev, &my_fops);
ret = cdev_add(&my_cdev, dev_num, 1);
if (ret)
goto err_cdev;
dev_class = class_create(DEV_NAME);
device_create(dev_class, NULL, dev_num, NULL, DEV_NAME);
pr_info("kfifo_example: /dev/%s 생성 완료\n", DEV_NAME);
return 0;
err_cdev:
unregister_chrdev_region(dev_num, 1);
return ret;
}
static void __exit kfifo_dev_exit(void)
{
device_destroy(dev_class, dev_num);
class_destroy(dev_class);
cdev_del(&my_cdev);
unregister_chrdev_region(dev_num, 1);
pr_info("kfifo_example: 디바이스 제거\n");
}
module_init(kfifo_dev_init);
module_exit(kfifo_dev_exit);
MODULE_LICENSE("GPL");
MODULE_DESCRIPTION("kfifo 기반 문자 디바이스 예제");
사용자 공간에서 테스트:
# 모듈 로드
sudo insmod kfifo_example.ko
# 데이터 쓰기
echo "Hello kfifo!" > /dev/kfifo_example
# 데이터 읽기
cat /dev/kfifo_example
# 출력: Hello kfifo!
# non-blocking 테스트 (비어있을 때)
dd if=/dev/kfifo_example of=/dev/null bs=1 count=1 iflag=nonblock
# dd: /dev/kfifo_example: Resource temporarily unavailable
kfifo_to_user()는 내부적으로 copy_to_user()를 호출하지만,
원형 버퍼의 wrap-around를 자동으로 처리합니다 (최대 2번의 copy_to_user() 호출).
직접 copy_to_user()를 사용하면 wrap 처리를 수동으로 해야 합니다.
네트워크 서브시스템 활용
kfifo는 네트워크 관련 드라이버에서 데이터 버퍼링에 널리 사용됩니다. 특히 UART/시리얼 드라이버에서 TX/RX 버퍼로 활용하는 패턴이 대표적입니다.
네트워크 패킷 버퍼링 패턴
커널 시리얼 드라이버에서의 kfifo 활용 코드:
/* drivers/tty/serial/ 기반 UART kfifo 패턴 */
/* IRQ 핸들러: 하드웨어 RX FIFO → kfifo */
static irqreturn_t uart_rx_irq(int irq, void *dev_id)
{
struct my_uart *uart = dev_id;
unsigned char ch;
while (uart_rx_ready(uart)) {
ch = uart_read_byte(uart);
/* lock-free: IRQ가 유일한 생산자 */
if (kfifo_in(&uart->rx_fifo, &ch, 1) == 0)
uart->rx_overrun++; /* 오버런 카운터 */
}
/* 워크큐에서 데이터 처리 예약 */
schedule_work(&uart->rx_work);
return IRQ_HANDLED;
}
/* 워크큐: kfifo → TTY 레이어 */
static void uart_rx_work(struct work_struct *work)
{
struct my_uart *uart = container_of(work,
struct my_uart, rx_work);
unsigned char buf[256];
unsigned int len;
/* lock-free: 워크큐가 유일한 소비자 */
while ((len = kfifo_out(&uart->rx_fifo, buf, sizeof(buf)))) {
tty_insert_flip_string(&uart->port, buf, len);
}
tty_flip_buffer_push(&uart->port);
}
/* TX: TTY 레이어 → kfifo → 하드웨어 */
static void uart_start_tx(struct uart_port *port)
{
struct my_uart *uart = to_my_uart(port);
unsigned char ch;
while (uart_tx_ready(uart) &&
kfifo_out(&uart->tx_fifo, &ch, 1)) {
uart_write_byte(uart, ch);
}
}
ftrace/perf 디버깅 실습
kfifo 기반 드라이버를 디버깅하고 성능을 측정하는 실무적인 방법을 다룹니다. ftrace, bpftrace, perf를 사용하여 kfifo 동작을 관찰하고 병목을 찾는 기법입니다.
ftrace로 kfifo 함수 추적
# kfifo 관련 커널 함수 목록 확인
cat /sys/kernel/debug/tracing/available_filter_functions | grep kfifo
# function tracer 설정
echo 0 > /sys/kernel/debug/tracing/tracing_on
echo function > /sys/kernel/debug/tracing/current_tracer
# kfifo 함수만 필터링
echo '__kfifo_in __kfifo_out __kfifo_alloc' > \
/sys/kernel/debug/tracing/set_ftrace_filter
# 추적 시작
echo 1 > /sys/kernel/debug/tracing/tracing_on
# 드라이버 동작 유발 (예: 시리얼 포트 사용)
echo "test" > /dev/ttyS0
# 추적 결과 확인
cat /sys/kernel/debug/tracing/trace
# 예상 출력:
# kfifo_exampl-1234 [001] .... 12345.678: __kfifo_in <-uart_write
# kfifo_exampl-1234 [001] .... 12345.679: __kfifo_out <-uart_start_tx
# 추적 종료
echo 0 > /sys/kernel/debug/tracing/tracing_on
echo nop > /sys/kernel/debug/tracing/current_tracer
bpftrace로 kfifo 채움 수준 모니터링
/* kfifo_monitor.bt - kfifo 채움 수준 실시간 모니터링 */
/* 실행: sudo bpftrace kfifo_monitor.bt */
/* __kfifo_in 진입 시 삽입 크기 히스토그램 */
kprobe:__kfifo_in
{
@in_size = hist(arg2); /* arg2 = len */
@in_count = count();
}
/* __kfifo_out 진입 시 추출 크기 히스토그램 */
kprobe:__kfifo_out
{
@out_size = hist(arg2);
@out_count = count();
}
/* __kfifo_in 반환값으로 실제 삽입된 양 확인 */
kretprobe:__kfifo_in
{
@actual_in = hist(retval);
if (retval == 0) {
@fifo_full = count(); /* kfifo가 꽉 차서 삽입 실패 */
}
}
/* 주기적 요약 (5초마다) */
interval:s:5
{
printf("--- kfifo 통계 (5초) ---\n");
printf("삽입 횟수: %lld, 추출 횟수: %lld\n",
@in_count, @out_count);
printf("kfifo full 발생: %lld\n", @fifo_full);
print(@in_size);
clear(@in_count); clear(@out_count); clear(@fifo_full);
}
perf probe로 처리량 측정
# kfifo 함수에 동적 probe 추가
sudo perf probe --add '__kfifo_in len=%dx:u32'
sudo perf probe --add '__kfifo_out len=%dx:u32'
# 10초간 kfifo 호출 통계 수집
sudo perf stat -e 'probe:__kfifo_in,probe:__kfifo_out' -a sleep 10
# 예상 출력:
# Performance counter stats for 'system wide':
# 1,234,567 probe:__kfifo_in
# 1,234,000 probe:__kfifo_out
# 10.001234 seconds time elapsed
# 호출 스택과 함께 기록
sudo perf record -g -e 'probe:__kfifo_in' -a sleep 5
sudo perf report --sort comm,symbol
# probe 정리
sudo perf probe --del '__kfifo_in'
sudo perf probe --del '__kfifo_out'
버퍼 오버플로우/언더플로우 디버깅
/* 디버깅용 kfifo 상태 출력 함수 */
static void kfifo_debug_dump(struct kfifo *fifo,
const char *label)
{
pr_debug("%s: in=%u out=%u len=%u avail=%u full=%d empty=%d\n",
label,
fifo->kfifo.in,
fifo->kfifo.out,
kfifo_len(fifo),
kfifo_avail(fifo),
kfifo_is_full(fifo),
kfifo_is_empty(fifo));
}
/* 오버플로우 감지 래퍼 */
static unsigned int kfifo_in_debug(struct kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int avail = kfifo_avail(fifo);
unsigned int ret;
if (len > avail) {
pr_warn("kfifo: 오버플로우 시도! 요청=%u 가용=%u\n",
len, avail);
dump_stack(); /* 호출 스택 출력 */
}
ret = kfifo_in(fifo, buf, len);
if (ret != len)
pr_warn("kfifo: 부분 삽입 %u/%u\n", ret, len);
return ret;
}
pr_debug()는 CONFIG_DYNAMIC_DEBUG가 설정되면 런타임에 켜고 끌 수 있으므로,
pr_info()나 printk() 대신 pr_debug()를 사용하세요.
echo 'module my_driver +p' > /sys/kernel/debug/dynamic_debug/control로 활성화합니다.
kfifo vs 대안 비교
커널에는 kfifo 외에도 여러 링 버퍼 구현이 있습니다. 각각의 설계 목표와 사용 시나리오가 다르므로, 적절한 선택이 중요합니다.
주요 링 버퍼 비교
| 특성 | kfifo | ring_buffer (ftrace) | io_uring SQ/CQ | virtio vring |
|---|---|---|---|---|
| 주 용도 | 범용 데이터 전달 | 이벤트 트레이싱 | 비동기 I/O | 가상화 I/O |
| 동시성 | SPSC lock-free | MPSC lock-free | SPSC lock-free | SPSC/MPSC |
| 요소 크기 | 고정/가변 (레코드 모드) | 가변 (이벤트 헤더) | 고정 (SQE/CQE) | 고정 (descriptor) |
| 메모리 공유 | 커널 내부 | 커널→유저 (mmap) | 커널↔유저 (mmap) | 호스트↔게스트 |
| 오버플로우 처리 | 삽입 거부 (부분 삽입) | 오래된 데이터 덮어쓰기 | 삽입 거부 | 삽입 거부 |
| 크기 제약 | 2의 거듭제곱 | 페이지 정렬 | 2의 거듭제곱 | 2의 거듭제곱 |
| DMA 지원 | scatter/gather API | 없음 | 없음 | scatter/gather |
| 복잡도 | 낮음 (~400줄) | 높음 (~3000줄) | 중간 (~2000줄) | 중간 (~1500줄) |
| 헤더 파일 | linux/kfifo.h |
linux/ring_buffer.h |
linux/io_uring.h |
linux/virtio_ring.h |
ring_buffer의 MPSC 설계에서 유사한 아이디어가 사용됩니다.
소스 코드 워크스루
lib/kfifo.c와 include/linux/kfifo.h의 핵심 함수들을 호출 순서대로 분석합니다.
전체 소스는 약 400줄로, 간결하면서도 견고한 구현입니다.
주요 함수 호출 그래프
__kfifo_alloc() 상세
/* lib/kfifo.c */
int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
size_t esize, gfp_t gfp_mask)
{
/*
* 크기를 2의 거듭제곱으로 반올림
* 예: size=6 → 8, size=9 → 16
*/
size = roundup_pow_of_two(size);
fifo->in = 0;
fifo->out = 0;
fifo->esize = esize; /* 요소 크기 (바이트) */
if (size < 2) {
/* 최소 크기 2: in != out 조건으로 empty/full 구별 */
fifo->data = NULL;
fifo->mask = 0;
return -EINVAL;
}
fifo->data = kmalloc_array(esize, size, gfp_mask);
if (!fifo->data) {
fifo->mask = 0;
return -ENOMEM;
}
fifo->mask = size - 1; /* 모든 인덱싱에 사용될 마스크 */
return 0;
}
memcpy 분할 복사 상세
원형 버퍼에서 wrap이 발생하면 데이터가 버퍼 끝과 처음에 나뉩니다. kfifo_copy_in()과 kfifo_copy_out()은 이를 최대 2번의 memcpy()로 처리합니다.
/* lib/kfifo.c - kfifo_copy_in 분석 */
static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1; /* 전체 버퍼 크기 */
unsigned int esize = fifo->esize; /* 요소 크기 */
unsigned int l;
off &= fifo->mask; /* 실제 배열 인덱스: off mod size */
/* 바이트 단위로 변환 (esize가 1이 아닌 경우) */
if (esize != 0) {
off *= esize;
size *= esize;
len *= esize;
}
/*
* l = 현재 위치(off)에서 버퍼 끝까지 복사 가능한 바이트 수
*
* 예: size=64, off=50, len=20
* l = min(20, 64-50) = min(20, 14) = 14
* 1차 memcpy: buf[50..63] ← src[0..13] (14바이트)
* 2차 memcpy: buf[0..5] ← src[14..19] (6바이트)
*
* 예: size=64, off=10, len=20
* l = min(20, 64-10) = min(20, 54) = 20
* 1차 memcpy: buf[10..29] ← src[0..19] (20바이트)
* 2차 memcpy: len-l = 0 → no-op
*/
l = min(len, size - off);
memcpy(fifo->data + off, src, l); /* 1차: 끝까지 */
memcpy(fifo->data, src + l, len - l); /* 2차: 처음부터 */
}
/* kfifo_copy_out도 동일한 분할 로직 (방향만 반대) */
static void kfifo_copy_out(struct __kfifo *fifo, void *dst,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask;
if (esize != 0) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off);
memcpy(dst, fifo->data + off, l); /* 1차: 끝까지 */
memcpy(dst + l, fifo->data, len - l); /* 2차: 처음부터 */
}
/* __kfifo_free */
void __kfifo_free(struct __kfifo *fifo)
{
kfree(fifo->data);
fifo->in = 0;
fifo->out = 0;
fifo->esize = 0;
fifo->data = NULL;
fifo->mask = 0;
}
memcpy()는 길이 0으로 호출되어
컴파일러가 최적화합니다. 즉, 일반적인 경우 오버헤드 없이 단일 memcpy()만 실행됩니다.
GCC와 Clang 모두 길이 0인 memcpy()를 완전히 제거합니다.
include/linux/kfifo.h: 매크로, 인라인 함수, 타입 안전 래퍼 (약 700줄)lib/kfifo.c: 핵심 구현 함수 (약 400줄)samples/kfifo/: 커널 내장 예제 코드 (bytestream, inttype, record)
git log --oneline lib/kfifo.c로 변경 이력을 확인할 수 있습니다.
관련 문서
kfifo와 관련된 다른 주제를 더 깊이 이해하고 싶다면 다음 문서를 참고하세요.