kfifo (Circular Buffer)
Linux 커널의 범용 FIFO 큐 구현인 kfifo(원형 버퍼(Buffer))의 설계 원리, lock-free 단일 생산자/소비자 패턴, API 사용법, 디바이스 드라이버와 로그 버퍼 활용 사례, 성능 최적화 기법을 종합적으로 다룹니다.
핵심 요약
- FIFO 순서 — First In First Out, 먼저 들어간 데이터가 먼저 나옵니다.
- 고정 크기 — 생성 시 크기가 결정되며, 반드시 2의 거듭제곱이어야 합니다.
- Lock-free — 단일 생산자/단일 소비자 구성에서 lock 없이 동작합니다.
- Wrap-around — 버퍼 끝에 도달하면 자동으로 처음으로 돌아갑니다.
- 빠른 연산 — in/out 인덱스를 모듈로 연산 대신 비트 마스킹으로 처리합니다.
단계별 이해
- 원형 버퍼 개념
선형 배열을 원형으로 사용하는 원리를 이해합니다. - 인덱스 관리
in/out 포인터가 어떻게 순환하는지 확인합니다. - Lock-free 조건
단일 생산자/소비자에서 왜 lock이 필요 없는지 학습합니다. - API 사용
kfifo_alloc, kfifo_in, kfifo_out 함수를 익힙니다.
개요 (Overview)
kfifo(kernel FIFO)는 커널이 제공하는 범용 원형 버퍼(circular buffer) 구현입니다.
<linux/kfifo.h>에 정의되어 있으며, 고정 크기 FIFO 큐를 효율적으로 관리합니다.
kfifo가 널리 사용되는 이유:
- Lock-free 설계 — 단일 생산자/단일 소비자(SPSC) 구성에서 spinlock이나 mutex 없이 안전하게 동작합니다.
- O(1) 연산 — 삽입(enqueue)과 제거(dequeue)가 상수 시간에 수행됩니다.
- 메모리 효율 — 고정 크기 배열 하나로 구현되어 오버헤드(Overhead)가 최소화됩니다.
- 캐시(Cache) 친화 — 연속 메모리 공간을 사용하여 CPU 캐시 효율이 높습니다.
- 간단한 API — 할당, 삽입, 제거, 해제의 4가지 핵심 함수만으로 대부분의 작업이 가능합니다.
Lock-free 제약: kfifo의 lock-free 특성은 단일 생산자와 단일 소비자일 때만 보장됩니다. 다중 생산자(MPSC) 또는 다중 소비자(SPMC)가 필요하면 명시적인 동기화(spinlock)가 필요합니다.
원형 버퍼 원리 (Circular Buffer Concept)
원형 버퍼는 선형 배열을 원형처럼 사용하는 자료구조입니다. 두 개의 인덱스(in, out)를 유지합니다:
- in (head, write pointer): 다음 쓰기 위치
- out (tail, read pointer): 다음 읽기 위치
인덱스 관리 (Index Management)
kfifo의 핵심은 인덱스를 어떻게 효율적으로 관리하는가입니다:
/* 전통적인 원형 버퍼 (느림) */
unsigned int in = 0, out = 0;
#define SIZE 8
/* 쓰기 */
buffer[in % SIZE] = data;
in++;
/* 읽기 */
data = buffer[out % SIZE];
out++;
/* 문제: 모듈로(%) 연산은 느림! */
/* kfifo 방식 (빠름) - 크기가 2의 거듭제곱일 때 */
#define SIZE 8 /* 2^3 */
#define MASK (SIZE - 1) /* 0b111 */
/* 쓰기 */
buffer[in & MASK] = data; /* 비트 AND는 1사이클! */
in++;
/* 읽기 */
data = buffer[out & MASK];
out++;
/* in, out은 계속 증가하되, 실제 인덱스는 & MASK로 계산 */
왜 2의 거듭제곱? 크기가 2^n이면 index % size를 index & (size-1)로 대체할 수 있습니다. 모듈로 연산은 수십 사이클이 걸리지만, 비트 AND는 1사이클만에 끝납니다.
Wrap-around 인덱스 비트 레벨 동작
인덱스가 오버플로우될 때 비트 마스킹으로 어떻게 wrap-around되는지 시각화합니다:
kfifo API
struct kfifo 구조
/* include/linux/kfifo.h */
struct __kfifo {
unsigned int in; /* 쓰기 인덱스 (계속 증가) */
unsigned int out; /* 읽기 인덱스 (계속 증가) */
unsigned int mask; /* size - 1 (비트 마스크) */
unsigned int esize; /* 원소 크기 (바이트) */
void *data; /* 실제 버퍼 */
};
#define DECLARE_KFIFO(fifo, type, size) \
struct __kfifo fifo = { \
.in = 0, .out = 0, .mask = size - 1, \
.esize = sizeof(type), \
.data = (void *)array \
}
할당과 해제 (Allocation & Deallocation)
/* 동적 할당 */
int kfifo_alloc(struct kfifo *fifo, unsigned int size, gfp_t gfp_mask);
/* 사용 예 */
struct kfifo my_fifo;
int ret = kfifo_alloc(&my_fifo, 1024, GFP_KERNEL);
if (ret) {
pr_err("kfifo_alloc failed\n");
return ret;
}
/* 정적 할당 (컴파일 타임) */
DECLARE_KFIFO_PTR(my_fifo, int);
kfifo_alloc(&my_fifo, 128, GFP_KERNEL);
/* 해제 */
void kfifo_free(struct kfifo *fifo);
삽입과 제거 (Enqueue & Dequeue)
/* 데이터 삽입 (enqueue) */
unsigned int kfifo_in(struct kfifo *fifo, const void *buf,
unsigned int len);
/* 반환값: 실제로 삽입된 바이트 수 (버퍼 가득 차면 len보다 작을 수 있음) */
/* 데이터 제거 (dequeue) */
unsigned int kfifo_out(struct kfifo *fifo, void *buf,
unsigned int len);
/* 반환값: 실제로 읽은 바이트 수 (버퍼 비어있으면 len보다 작을 수 있음) */
/* Peek (읽지 않고 확인만) */
unsigned int kfifo_out_peek(struct kfifo *fifo, void *buf,
unsigned int len);
/* 사용 예 */
char data[64] = "Hello, kfifo!";
unsigned int ret;
ret = kfifo_in(&my_fifo, data, strlen(data));
pr_info("Wrote %u bytes\n", ret);
char output[64];
ret = kfifo_out(&my_fifo, output, sizeof(output));
pr_info("Read %u bytes: %s\n", ret, output);
상태 확인 (Query Operations)
/* 버퍼 크기 */
unsigned int kfifo_size(struct kfifo *fifo);
/* 사용 가능한 바이트 수 */
unsigned int kfifo_avail(struct kfifo *fifo);
/* 저장된 데이터 바이트 수 */
unsigned int kfifo_len(struct kfifo *fifo);
/* 비어있는가? */
bool kfifo_is_empty(struct kfifo *fifo);
/* 가득 찼는가? */
bool kfifo_is_full(struct kfifo *fifo);
/* 초기화 (데이터 모두 삭제) */
void kfifo_reset(struct kfifo *fifo);
Lock-free 원리 (Lock-Free Mechanism)
kfifo는 단일 생산자/단일 소비자 구성에서 lock 없이 안전하게 동작합니다. 핵심 원리:
- 독립적인 인덱스:
in은 producer만,out은 consumer만 수정합니다. - 메모리 배리어(Memory Barrier):
smp_wmb()와smp_rmb()로 순서를 보장합니다. - 단조 증가:
in과out은 절대 감소하지 않으므로 경합(Contention)이 발생하지 않습니다.
/* lib/kfifo.c - kfifo_in 간략화 버전 */
unsigned int kfifo_in(struct __kfifo *fifo, const void *buf,
unsigned int len)
{
unsigned int l;
/* 사용 가능한 공간 계산 */
l = kfifo_unused(fifo);
if (len > l)
len = l;
/* 데이터 복사 */
__kfifo_in_data(fifo, buf, len, fifo->in);
/* 쓰기 배리어: 데이터 쓰기가 in 업데이트보다 먼저 보이도록 보장 */
smp_wmb();
/* in 인덱스 업데이트 (producer만 수정) */
fifo->in += len;
return len;
}
/* kfifo_out 간략화 버전 */
unsigned int kfifo_out(struct __kfifo *fifo, void *buf,
unsigned int len)
{
unsigned int l;
/* 사용 가능한 데이터 계산 */
l = fifo->in - fifo->out;
if (len > l)
len = l;
/* 읽기 배리어: in 읽기가 데이터 읽기보다 먼저 보이도록 보장 */
smp_rmb();
/* 데이터 복사 */
__kfifo_out_data(fifo, buf, len, fifo->out);
/* out 인덱스 업데이트 (consumer만 수정) */
fifo->out += len;
return len;
}
다중 생산자/소비자: 여러 CPU에서 동시에 kfifo_in()을 호출하거나, 여러 CPU에서 kfifo_out()을 호출하면 경합이 발생합니다. 이 경우 spinlock으로 보호해야 합니다.
메모리 배리어 동작 원리
Lock-free kfifo가 안전하게 동작하는 핵심은 메모리 배리어입니다. 다중 CPU 환경에서의 동작을 시각화합니다:
메모리 배리어의 역할: CPU는 성능을 위해 메모리 연산 순서를 재배치(Relocation)할 수 있습니다. 메모리 배리어는 특정 순서를 강제하여 다른 CPU에서 올바른 순서로 보이도록 보장합니다. lock은 배리어를 포함하지만, 배리어만 사용하면 훨씬 가볍습니다.
실사용 사례 (Real-World Usage)
디바이스 드라이버 버퍼링
문자 디바이스 드라이버에서 IRQ 핸들러(Handler)와 read() 시스템 콜(System Call) 사이의 버퍼:
/* drivers/tty/serial/serial_core.c 유사 패턴 */
struct uart_port {
struct kfifo rx_fifo;
/* ... */
};
/* IRQ 핸들러 (producer) */
static irqreturn_t uart_irq_handler(int irq, void *dev_id)
{
struct uart_port *port = dev_id;
char ch;
/* 하드웨어에서 문자 읽기 */
ch = readb(port->membase + UART_RX_REG);
/* kfifo에 저장 (lock-free!) */
kfifo_in(&port->rx_fifo, &ch, 1);
return IRQ_HANDLED;
}
/* read() 시스템 콜 (consumer) */
static ssize_t uart_read(struct file *file, char __user *buf,
size_t count, loff_t *ppos)
{
struct uart_port *port = file_private_data(file);
char tmp[128];
unsigned int copied;
/* kfifo에서 읽기 (lock-free!) */
copied = kfifo_out(&port->rx_fifo, tmp, min(count, sizeof(tmp)));
if (copy_to_user(buf, tmp, copied))
return -EFAULT;
return copied;
}
커널 로그 버퍼
/* kernel/printk/printk.c 유사 패턴 */
static DEFINE_KFIFO(printk_fifo, char, 1024 * 16);
int vprintk_emit(const char *fmt, va_list args)
{
char text[512];
int len;
len = vsnprintf(text, sizeof(text), fmt, args);
/* 로그 메시지를 kfifo에 저장 */
kfifo_in(&printk_fifo, text, len);
/* 콘솔 출력 등 */
console_unlock();
return len;
}
실습 예제 (Hands-On Example)
간단한 문자 디바이스 드라이버로 kfifo를 활용하는 예제입니다:
/* chardev_kfifo.c - kfifo 기반 문자 디바이스 */
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/kfifo.h>
#include <linux/uaccess.h>
#define DEVICE_NAME "kfifo_dev"
#define FIFO_SIZE 1024
static dev_t dev_num;
static struct cdev c_dev;
static struct class *dev_class;
static DEFINE_KFIFO(my_fifo, char, FIFO_SIZE);
static ssize_t dev_read(struct file *f, char __user *buf,
size_t len, loff_t *off)
{
char tmp[256];
unsigned int copied;
copied = kfifo_out(&my_fifo, tmp, min(len, sizeof(tmp)));
if (copied == 0)
return 0; /* EOF */
if (copy_to_user(buf, tmp, copied))
return -EFAULT;
pr_info("Read %u bytes\n", copied);
return copied;
}
static ssize_t dev_write(struct file *f, const char __user *buf,
size_t len, loff_t *off)
{
char tmp[256];
unsigned int copied;
size_t to_copy = min(len, sizeof(tmp));
if (copy_from_user(tmp, buf, to_copy))
return -EFAULT;
copied = kfifo_in(&my_fifo, tmp, to_copy);
pr_info("Wrote %u bytes (available: %u)\n",
copied, kfifo_avail(&my_fifo));
return copied;
}
static struct file_operations fops = {
.owner = THIS_MODULE,
.read = dev_read,
.write = dev_write,
};
static int __init chardev_init(void)
{
if (alloc_chrdev_region(&dev_num, 0, 1, DEVICE_NAME) < 0)
return -1;
cdev_init(&c_dev, &fops);
if (cdev_add(&c_dev, dev_num, 1) < 0) {
unregister_chrdev_region(dev_num, 1);
return -1;
}
dev_class = class_create(THIS_MODULE, DEVICE_NAME);
device_create(dev_class, NULL, dev_num, NULL, DEVICE_NAME);
pr_info("kfifo device registered (buffer=%u bytes)\n",
kfifo_size(&my_fifo));
return 0;
}
static void __exit chardev_exit(void)
{
device_destroy(dev_class, dev_num);
class_destroy(dev_class);
cdev_del(&c_dev);
unregister_chrdev_region(dev_num, 1);
pr_info("kfifo device unregistered\n");
}
module_init(chardev_init);
module_exit(chardev_exit);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("MINZKN");
MODULE_DESCRIPTION("kfifo character device example");
사용 예제
# 모듈 로드
sudo insmod chardev_kfifo.ko
# 디바이스 확인
ls -l /dev/kfifo_dev
# 데이터 쓰기
echo "Hello, kfifo!" > /dev/kfifo_dev
# 데이터 읽기
cat /dev/kfifo_dev
# 출력: Hello, kfifo!
# 커널 로그 확인
dmesg | tail
# kfifo device registered (buffer=1024 bytes)
# Wrote 14 bytes (available: 1010)
# Read 14 bytes
성능 최적화 (Performance Optimization)
| 기법 | 설명 | 효과 |
|---|---|---|
| 2의 거듭제곱 크기 | 모듈로 대신 비트 마스킹 사용 | 인덱스 계산 10x 빠름 |
| Lock-free SPSC | 단일 생산자/소비자에서 lock 제거 | 동기화 오버헤드 제거 |
| 메모리 배리어 | smp_wmb/smp_rmb로 순서 보장(Ordering) | 락보다 저렴 |
| 연속 메모리 | 캐시 라인(Cache Line) 활용 | 캐시 미스 감소 |
벤치마크: lock-free kfifo는 spinlock 기반 큐보다 약 3-5배 빠르며, 특히 고빈도 IRQ 환경에서 성능 차이가 극대화됩니다.
kfifo 동시성 검증 체크리스트
kfifo는 SPSC에서 lock-free가 가능하지만, MPMC 상황에서는 반드시 외부 동기화가 필요합니다. 사용 패턴을 먼저 고정한 뒤 API를 선택해야 안전합니다.
| 시나리오 | 권장 방식 | 주의점 |
|---|---|---|
| 단일 생산자 / 단일 소비자 | lock-free kfifo | 메모리 배리어 의미를 이해하고 사용 |
| 다중 생산자 또는 다중 소비자 | spinlock + kfifo | irq 컨텍스트면 irqsave 계열 사용 |
| 유저 공간 복사 포함 | kfifo_to_user/kfifo_from_user | 부분 복사/오류 처리 분기 점검 |
# 빈번한 오용 점검
git grep -n "kfifo_in\\|kfifo_out\\|kfifo_to_user\\|kfifo_from_user" -- "*.c"
git grep -n "spin_lock.*kfifo" -- "*.c"
Power-of-2 마스킹 최적화
kfifo의 성능 핵심은 버퍼 크기를 2의 거듭제곱으로 강제하는 설계입니다. 이 제약 덕분에 비용이 큰 나눗셈/모듈로 연산을 비트 AND 한 번으로 대체할 수 있습니다.
2의 거듭제곱 검증 원리
커널은 size & (size - 1) == 0 패턴으로 2의 거듭제곱 여부를 판별합니다. 이 기법은 2의 거듭제곱이 항상 단 하나의 비트만 1인 점을 활용합니다.
커널 소스에서 이 검증이 어떻게 사용되는지 확인합니다:
/* include/linux/kfifo.h */
/**
* kfifo_alloc - 동적 kfifo 할당
* @fifo: 초기화할 fifo 포인터
* @size: 요청 크기 (2의 거듭제곱으로 반올림됨)
* @gfp_mask: GFP 플래그
*/
static inline int __kfifo_alloc(struct __kfifo *fifo,
unsigned int size, size_t esize, gfp_t gfp_mask)
{
/* 2의 거듭제곱으로 반올림 */
size = roundup_pow_of_two(size);
fifo->in = 0;
fifo->out = 0;
fifo->esize = esize;
/* mask = size - 1: 모든 인덱스 연산에 사용 */
fifo->mask = size - 1;
fifo->data = kmalloc_array(esize, size, gfp_mask);
if (!fifo->data)
return -ENOMEM;
return 0;
}
마스킹 인덱스 상세 동작
마스킹 연산의 핵심은 in과 out이 무한히 증가하도록 두고, 실제 배열 인덱스만 마스크로 계산하는 것입니다. 이 설계 덕분에 랩어라운드 판단이 불필요합니다.
/* include/linux/kfifo.h - roundup_pow_of_two 내부 */
/**
* roundup_pow_of_two - 주어진 값보다 크거나 같은 최소 2의 거듭제곱 반환
*
* 예: roundup_pow_of_two(5) = 8
* roundup_pow_of_two(8) = 8
* roundup_pow_of_two(9) = 16
*/
static inline unsigned long __roundup_pow_of_two(unsigned long n)
{
return 1UL << fls_long(n - 1);
}
/* fls_long: find last set bit (최상위 설정 비트 위치)
* fls_long(4) = 3 (bit 2), fls_long(7) = 3
* 1UL << 3 = 8
*/
Lock-free 정확성 증명
kfifo의 SPSC(단일 생산자/단일 소비자) lock-free 동작이 정확한 이유를 메모리 순서 관점에서 증명합니다.
핵심은 smp_wmb()와 smp_rmb()가 올바른 위치에서 관찰 순서를 보장하는 점입니다.
메모리 순서와 SPSC 안전성
이 순서 보장이 왜 안전한지 형식적으로 정리합니다:
- 불변식 1:
in은 생산자만 수정하고,out은 소비자만 수정합니다 (변수 소유권 분리). - 불변식 2: 생산자는 데이터를 쓴 후에
smp_wmb()를 호출하고, 그 후에in을 증가시킵니다. - 불변식 3: 소비자는
in을 읽은 후에smp_rmb()를 호출하고, 그 후에 데이터를 읽습니다. - 결론: 소비자가 새로운
in값을 관찰했다면,wmb→rmb쌍에 의해 대응하는 데이터도 반드시 관찰 가능합니다. 따라서 stale 데이터 읽기가 불가능합니다.
다중 생산자 문제
두 생산자가 동시에 kfifo에 쓰려 하면 어떤 문제가 발생하는지 봅니다:
__kfifo_in()과 __kfifo_out()의 실제 구현에서 배리어 배치를 확인합니다:
/* lib/kfifo.c - __kfifo_in() 핵심 흐름 */
static void kfifo_copy_in(struct __kfifo *fifo,
const void *src, unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask; /* 실제 배열 인덱스 */
/* esize가 0이면 바이트 단위 */
if (esize != 0) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off); /* 끝까지 복사 가능한 양 */
memcpy(fifo->data + off, src, l); /* 첫 번째 조각 */
memcpy(fifo->data, src + l, len - l); /* wrap 부분 (있으면) */
}
unsigned int __kfifo_in(struct __kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int l;
l = kfifo_unused(fifo); /* 빈 공간 계산 */
if (len > l)
len = l; /* 빈 공간만큼만 쓰기 */
kfifo_copy_in(fifo, buf, len, fifo->in);
/* ★ 핵심: 데이터 쓰기 완료 후 write barrier */
smp_wmb();
/* 그 후에 in 갱신 → 소비자가 새 데이터를 볼 수 있음 */
fifo->in += len;
return len;
}
unsigned int __kfifo_out(struct __kfifo *fifo,
void *buf, unsigned int len)
{
unsigned int l;
l = fifo->in - fifo->out; /* 사용 가능한 데이터 */
if (len > l)
len = l;
/* ★ 핵심: in 읽기 후 read barrier */
smp_rmb();
kfifo_copy_out(fifo, buf, len, fifo->out);
/* 데이터 읽기 완료 후 write barrier */
smp_wmb();
/* 그 후에 out 갱신 → 생산자가 공간 회수 가능 */
fifo->out += len;
return len;
}
smp_wmb()는 반드시 대응하는 smp_rmb()와 쌍을 이루어야 합니다.
생산자의 wmb(데이터→in)가 소비자의 rmb(in→데이터)와 짝을 이루어 크로스-CPU 가시성을 보장합니다.
배리어 하나만 있으면 다른 CPU에서 순서 재배치가 발생할 수 있습니다.
kfifo 내부 구조 분석
kfifo의 핵심 데이터 구조인 struct __kfifo의 각 필드와 헬퍼 함수들이 어떻게 동작하는지 상세히 분석합니다.
구조체(Struct) 레이아웃
/* include/linux/kfifo.h */
struct __kfifo {
unsigned int in; /* 쓰기 오프셋 (생산자만 수정) */
unsigned int out; /* 읽기 오프셋 (소비자만 수정) */
unsigned int mask; /* size - 1, 인덱스 마스킹용 */
unsigned int esize; /* 요소 크기 (바이트), 0이면 바이트 스트림 */
void *data; /* 실제 데이터 버퍼 포인터 */
};
unsigned 산술과 래핑 정확성
in과 out이 unsigned int 범위(0 ~ 232-1)를 넘어 오버플로우해도 올바르게 동작하는 이유를 분석합니다.
/* include/linux/kfifo.h - 헬퍼 함수들 */
/* 현재 저장된 데이터 개수 */
static inline unsigned int kfifo_len(struct __kfifo *fifo)
{
return fifo->in - fifo->out; /* unsigned 뺄셈 → 자동 wrap */
}
/* 남은 빈 공간 */
static inline unsigned int kfifo_avail(struct __kfifo *fifo)
{
return kfifo_size(fifo) - kfifo_len(fifo);
}
/* 비어 있는가? */
static inline bool kfifo_is_empty(struct __kfifo *fifo)
{
return fifo->in == fifo->out; /* 같으면 비어 있음 */
}
/* 가득 찼는가? */
static inline bool kfifo_is_full(struct __kfifo *fifo)
{
return kfifo_len(fifo) > fifo->mask;
/* mask = size-1 이므로, len > mask ↔ len >= size */
}
/* 전체 크기 */
static inline unsigned int kfifo_size(struct __kfifo *fifo)
{
return fifo->mask + 1;
}
in/out을 리셋하지 않는 이유는 단순합니다.
리셋에는 양쪽(생산자/소비자) 모두의 동의가 필요하고, 이는 동기화를 요구합니다.
unsigned 오버플로우의 수학적 성질을 활용하면 리셋 없이도 무한히 동작할 수 있습니다.
DMA scatter/gather 연동
kfifo 데이터를 DMA 엔진으로 직접 전송할 때, 버퍼가 원형으로 wrap될 수 있으므로 scatter/gather 리스트가 필요합니다. 커널은 이를 위해 전용 DMA 헬퍼 API를 제공합니다.
DMA 영역 분할 문제
DMA 헬퍼 API
/* include/linux/kfifo.h - DMA 관련 매크로/함수 */
/**
* kfifo_dma_in_prepare - DMA 입력을 위한 scatterlist 준비
* @fifo: kfifo 포인터
* @sgl: scatterlist 배열 (최소 2개 엔트리)
* @nents: scatterlist 엔트리 수
* @len: 전송할 데이터 길이
*
* 반환: 실제 사용된 scatterlist 엔트리 수 (1 또는 2)
*/
#define kfifo_dma_in_prepare(fifo, sgl, nents, len) \
({ \
typeof((fifo) + 1) __tmp = (fifo); \
struct scatterlist *__sgl = (sgl); \
unsigned int __len = (len); \
const size_t __recsize = sizeof(*__tmp->type); \
struct __kfifo *__kfifo = &__tmp->kfifo; \
(__recsize) ? \
__kfifo_dma_in_prepare_r(__kfifo, __sgl, nents, __len, __recsize) : \
__kfifo_dma_in_prepare(__kfifo, __sgl, nents, __len); \
})
/**
* kfifo_dma_in_finish - DMA 입력 완료 통지
* @fifo: kfifo 포인터
* @len: 실제 DMA로 전송된 바이트 수
*
* DMA 전송 완료 후 호출하여 in 포인터를 갱신
*/
#define kfifo_dma_in_finish(fifo, len) \
__kfifo_dma_in_finish(&(fifo)->kfifo, len)
/**
* kfifo_dma_out_prepare - DMA 출력을 위한 scatterlist 준비
* kfifo_dma_out_finish - DMA 출력 완료 통지 (out 갱신)
*/
실제 디바이스 드라이버에서의 DMA + kfifo 통합 예제:
/* DMA scatter/gather로 kfifo에서 디바이스로 전송 */
static int my_dma_tx(struct my_device *dev)
{
struct scatterlist sg[2]; /* 최대 2개 (wrap 가능) */
unsigned int nents;
struct dma_async_tx_descriptor *desc;
sg_init_table(sg, 2);
/* kfifo에서 DMA용 scatterlist 준비 */
nents = kfifo_dma_out_prepare(&dev->tx_fifo, sg, 2,
kfifo_len(&dev->tx_fifo));
if (!nents)
return 0; /* 전송할 데이터 없음 */
/* scatterlist를 DMA 매핑 */
nents = dma_map_sg(dev->dev, sg, nents, DMA_TO_DEVICE);
/* DMA 디스크립터 생성 */
desc = dmaengine_prep_slave_sg(dev->dma_chan, sg, nents,
DMA_MEM_TO_DEV,
DMA_PREP_INTERRUPT);
desc->callback = my_dma_tx_complete;
desc->callback_param = dev;
/* DMA 전송 시작 */
dmaengine_submit(desc);
dma_async_issue_pending(dev->dma_chan);
return 0;
}
/* DMA 완료 콜백 */
static void my_dma_tx_complete(void *param)
{
struct my_device *dev = param;
unsigned int len = kfifo_len(&dev->tx_fifo);
/* DMA 완료 후 kfifo out 포인터 갱신 */
kfifo_dma_out_finish(&dev->tx_fifo, len);
/* DMA 매핑 해제 */
dma_unmap_sg(dev->dev, dev->sg, 2, DMA_TO_DEVICE);
}
kfifo_dma_in_prepare()와 kfifo_dma_in_finish() 사이에
kfifo에 직접 데이터를 쓰면 안 됩니다. DMA 엔진이 물리 주소(Physical Address)를 기반으로 전송하므로,
중간에 데이터가 변경되면 일관성이 깨집니다. 반드시 finish() 호출 후에 다음 작업을 시작하세요.
레코드 모드 (Record FIFO)
기본 kfifo는 고정 크기 요소만 처리합니다. 가변 길이 데이터(로그 메시지, 네트워크 패킷 등)를 위해 커널은 레코드 모드를 제공합니다. 각 레코드 앞에 길이 접두사를 붙여 경계를 구분합니다.
레코드 레이아웃
/* include/linux/kfifo.h - 레코드 모드 선언 */
/* 1바이트 레코드 헤더: 레코드 최대 255바이트 */
struct my_rec1_fifo STRUCT_KFIFO_REC_1(256);
/* 2바이트 레코드 헤더: 레코드 최대 65535바이트 */
struct my_rec2_fifo STRUCT_KFIFO_REC_2(4096);
/* 초기화 */
INIT_KFIFO(my_fifo);
/* 레코드 넣기: 길이 접두사가 자동으로 추가됨 */
struct log_entry {
u64 timestamp;
u32 level;
char msg[200];
};
struct log_entry entry = {
.timestamp = ktime_get_ns(),
.level = KERN_WARNING,
.msg = "디스크 I/O 지연 감지",
};
/* kfifo_in은 sizeof(entry) 바이트 + 길이 접두사를 저장 */
kfifo_in(&my_fifo, &entry, sizeof(entry));
/* 짧은 메시지도 같은 fifo에 저장 가능 (가변 길이!) */
struct log_entry short_entry = {
.timestamp = ktime_get_ns(),
.level = KERN_INFO,
.msg = "OK",
};
kfifo_in(&my_fifo, &short_entry, offsetof(struct log_entry, msg) + 3);
/* 레코드 꺼내기: 저장된 길이만큼만 복사 */
struct log_entry out_entry;
unsigned int len = kfifo_out(&my_fifo, &out_entry, sizeof(out_entry));
/* len = 실제 레코드 크기 (접두사 제외) */
/* 다음 레코드 크기 미리 확인 (꺼내지 않고) */
unsigned int next_len = kfifo_peek_len(&my_fifo);
- 커널 로그 버퍼: 가변 길이 로그 메시지 저장
- Netlink 메시지 큐잉: 크기가 다른 netlink 메시지 버퍼링
- USB 요청 블록: 가변 크기 URB 데이터 임시 저장
- 오디오 프레임: 다양한 크기의 오디오 패킷 버퍼링
문자 디바이스 드라이버 통합
kfifo는 문자 디바이스 드라이버에서 사용자 공간(User Space)과 커널 공간(Kernel Space) 사이의 데이터 교환 버퍼로 자주 사용됩니다.
kfifo_to_user()와 kfifo_from_user()가 copy_to_user()/copy_from_user()를 내부적으로 처리합니다.
데이터 흐름
완전한 문자 디바이스 드라이버 예제
#include <linux/module.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/kfifo.h>
#include <linux/poll.h>
#include <linux/wait.h>
#define FIFO_SIZE 1024 /* 2의 거듭제곱 */
#define DEV_NAME "kfifo_example"
static struct cdev my_cdev;
static dev_t dev_num;
static struct class *dev_class;
/* 정적 kfifo 선언 (바이트 스트림 모드) */
static DECLARE_KFIFO(my_fifo, unsigned char, FIFO_SIZE);
/* 읽기 대기 큐 */
static DECLARE_WAIT_QUEUE_HEAD(read_wq);
static DECLARE_WAIT_QUEUE_HEAD(write_wq);
/* 다중 접근 보호용 뮤텍스 */
static DEFINE_MUTEX(fifo_lock);
static ssize_t my_read(struct file *file,
char __user *ubuf, size_t count, loff_t *ppos)
{
int ret;
unsigned int copied;
/* non-blocking 모드에서 비어있으면 즉시 반환 */
if (kfifo_is_empty(&my_fifo)) {
if (file->f_flags & O_NONBLOCK)
return -EAGAIN;
/* blocking 모드: 데이터가 올 때까지 대기 */
ret = wait_event_interruptible(read_wq,
!kfifo_is_empty(&my_fifo));
if (ret)
return ret;
}
mutex_lock(&fifo_lock);
ret = kfifo_to_user(&my_fifo, ubuf, count, &copied);
mutex_unlock(&fifo_lock);
if (ret)
return ret;
/* 공간이 생겼으므로 write 대기자 깨우기 */
if (!kfifo_is_full(&my_fifo))
wake_up_interruptible(&write_wq);
return copied;
}
static ssize_t my_write(struct file *file,
const char __user *ubuf, size_t count, loff_t *ppos)
{
int ret;
unsigned int copied;
if (kfifo_is_full(&my_fifo)) {
if (file->f_flags & O_NONBLOCK)
return -EAGAIN;
ret = wait_event_interruptible(write_wq,
!kfifo_is_full(&my_fifo));
if (ret)
return ret;
}
mutex_lock(&fifo_lock);
ret = kfifo_from_user(&my_fifo, ubuf, count, &copied);
mutex_unlock(&fifo_lock);
if (ret)
return ret;
/* 데이터가 들어왔으므로 read 대기자 깨우기 */
if (!kfifo_is_empty(&my_fifo))
wake_up_interruptible(&read_wq);
return copied;
}
static __poll_t my_poll(struct file *file,
struct poll_table_struct *wait)
{
__poll_t mask = 0;
poll_wait(file, &read_wq, wait);
poll_wait(file, &write_wq, wait);
if (!kfifo_is_empty(&my_fifo))
mask |= EPOLLIN | EPOLLRDNORM; /* 읽기 가능 */
if (!kfifo_is_full(&my_fifo))
mask |= EPOLLOUT | EPOLLWRNORM; /* 쓰기 가능 */
return mask;
}
static const struct file_operations my_fops = {
.owner = THIS_MODULE,
.read = my_read,
.write = my_write,
.poll = my_poll,
.llseek = noop_llseek,
};
static int __init kfifo_dev_init(void)
{
int ret;
INIT_KFIFO(my_fifo);
ret = alloc_chrdev_region(&dev_num, 0, 1, DEV_NAME);
if (ret < 0)
return ret;
cdev_init(&my_cdev, &my_fops);
ret = cdev_add(&my_cdev, dev_num, 1);
if (ret)
goto err_cdev;
dev_class = class_create(DEV_NAME);
device_create(dev_class, NULL, dev_num, NULL, DEV_NAME);
pr_info("kfifo_example: /dev/%s 생성 완료\n", DEV_NAME);
return 0;
err_cdev:
unregister_chrdev_region(dev_num, 1);
return ret;
}
static void __exit kfifo_dev_exit(void)
{
device_destroy(dev_class, dev_num);
class_destroy(dev_class);
cdev_del(&my_cdev);
unregister_chrdev_region(dev_num, 1);
pr_info("kfifo_example: 디바이스 제거\n");
}
module_init(kfifo_dev_init);
module_exit(kfifo_dev_exit);
MODULE_LICENSE("GPL");
MODULE_DESCRIPTION("kfifo 기반 문자 디바이스 예제");
사용자 공간에서 테스트:
# 모듈 로드
sudo insmod kfifo_example.ko
# 데이터 쓰기
echo "Hello kfifo!" > /dev/kfifo_example
# 데이터 읽기
cat /dev/kfifo_example
# 출력: Hello kfifo!
# non-blocking 테스트 (비어있을 때)
dd if=/dev/kfifo_example of=/dev/null bs=1 count=1 iflag=nonblock
# dd: /dev/kfifo_example: Resource temporarily unavailable
kfifo_to_user()는 내부적으로 copy_to_user()를 호출하지만,
원형 버퍼의 wrap-around를 자동으로 처리합니다 (최대 2번의 copy_to_user() 호출).
직접 copy_to_user()를 사용하면 wrap 처리를 수동으로 해야 합니다.
네트워크 서브시스템 활용
kfifo는 네트워크 관련 드라이버에서 데이터 버퍼링에 널리 사용됩니다. 특히 UART/시리얼 드라이버에서 TX/RX 버퍼로 활용하는 패턴이 대표적입니다.
네트워크 패킷 버퍼링 패턴
커널 시리얼 드라이버에서의 kfifo 활용 코드:
/* drivers/tty/serial/ 기반 UART kfifo 패턴 */
/* IRQ 핸들러: 하드웨어 RX FIFO → kfifo */
static irqreturn_t uart_rx_irq(int irq, void *dev_id)
{
struct my_uart *uart = dev_id;
unsigned char ch;
while (uart_rx_ready(uart)) {
ch = uart_read_byte(uart);
/* lock-free: IRQ가 유일한 생산자 */
if (kfifo_in(&uart->rx_fifo, &ch, 1) == 0)
uart->rx_overrun++; /* 오버런 카운터 */
}
/* 워크큐에서 데이터 처리 예약 */
schedule_work(&uart->rx_work);
return IRQ_HANDLED;
}
/* 워크큐: kfifo → TTY 레이어 */
static void uart_rx_work(struct work_struct *work)
{
struct my_uart *uart = container_of(work,
struct my_uart, rx_work);
unsigned char buf[256];
unsigned int len;
/* lock-free: 워크큐가 유일한 소비자 */
while ((len = kfifo_out(&uart->rx_fifo, buf, sizeof(buf)))) {
tty_insert_flip_string(&uart->port, buf, len);
}
tty_flip_buffer_push(&uart->port);
}
/* TX: TTY 레이어 → kfifo → 하드웨어 */
static void uart_start_tx(struct uart_port *port)
{
struct my_uart *uart = to_my_uart(port);
unsigned char ch;
while (uart_tx_ready(uart) &&
kfifo_out(&uart->tx_fifo, &ch, 1)) {
uart_write_byte(uart, ch);
}
}
ftrace/perf 디버깅(Debugging) 실습
kfifo 기반 드라이버를 디버깅하고 성능을 측정하는 실무적인 방법을 다룹니다. ftrace, bpftrace, perf를 사용하여 kfifo 동작을 관찰하고 병목(Bottleneck)을 찾는 기법입니다.
ftrace로 kfifo 함수 추적
# kfifo 관련 커널 함수 목록 확인
cat /sys/kernel/debug/tracing/available_filter_functions | grep kfifo
# function tracer 설정
echo 0 > /sys/kernel/debug/tracing/tracing_on
echo function > /sys/kernel/debug/tracing/current_tracer
# kfifo 함수만 필터링
echo '__kfifo_in __kfifo_out __kfifo_alloc' > \
/sys/kernel/debug/tracing/set_ftrace_filter
# 추적 시작
echo 1 > /sys/kernel/debug/tracing/tracing_on
# 드라이버 동작 유발 (예: 시리얼 포트 사용)
echo "test" > /dev/ttyS0
# 추적 결과 확인
cat /sys/kernel/debug/tracing/trace
# 예상 출력:
# kfifo_exampl-1234 [001] .... 12345.678: __kfifo_in <-uart_write
# kfifo_exampl-1234 [001] .... 12345.679: __kfifo_out <-uart_start_tx
# 추적 종료
echo 0 > /sys/kernel/debug/tracing/tracing_on
echo nop > /sys/kernel/debug/tracing/current_tracer
bpftrace로 kfifo 채움 수준 모니터링
/* kfifo_monitor.bt - kfifo 채움 수준 실시간 모니터링 */
/* 실행: sudo bpftrace kfifo_monitor.bt */
/* __kfifo_in 진입 시 삽입 크기 히스토그램 */
kprobe:__kfifo_in
{
@in_size = hist(arg2); /* arg2 = len */
@in_count = count();
}
/* __kfifo_out 진입 시 추출 크기 히스토그램 */
kprobe:__kfifo_out
{
@out_size = hist(arg2);
@out_count = count();
}
/* __kfifo_in 반환값으로 실제 삽입된 양 확인 */
kretprobe:__kfifo_in
{
@actual_in = hist(retval);
if (retval == 0) {
@fifo_full = count(); /* kfifo가 꽉 차서 삽입 실패 */
}
}
/* 주기적 요약 (5초마다) */
interval:s:5
{
printf("--- kfifo 통계 (5초) ---\n");
printf("삽입 횟수: %lld, 추출 횟수: %lld\n",
@in_count, @out_count);
printf("kfifo full 발생: %lld\n", @fifo_full);
print(@in_size);
clear(@in_count); clear(@out_count); clear(@fifo_full);
}
perf probe로 처리량 측정
# kfifo 함수에 동적 probe 추가
sudo perf probe --add '__kfifo_in len=%dx:u32'
sudo perf probe --add '__kfifo_out len=%dx:u32'
# 10초간 kfifo 호출 통계 수집
sudo perf stat -e 'probe:__kfifo_in,probe:__kfifo_out' -a sleep 10
# 예상 출력:
# Performance counter stats for 'system wide':
# 1,234,567 probe:__kfifo_in
# 1,234,000 probe:__kfifo_out
# 10.001234 seconds time elapsed
# 호출 스택과 함께 기록
sudo perf record -g -e 'probe:__kfifo_in' -a sleep 5
sudo perf report --sort comm,symbol
# probe 정리
sudo perf probe --del '__kfifo_in'
sudo perf probe --del '__kfifo_out'
버퍼 오버플로(Buffer Overflow)우/언더플로우 디버깅
/* 디버깅용 kfifo 상태 출력 함수 */
static void kfifo_debug_dump(struct kfifo *fifo,
const char *label)
{
pr_debug("%s: in=%u out=%u len=%u avail=%u full=%d empty=%d\n",
label,
fifo->kfifo.in,
fifo->kfifo.out,
kfifo_len(fifo),
kfifo_avail(fifo),
kfifo_is_full(fifo),
kfifo_is_empty(fifo));
}
/* 오버플로우 감지 래퍼 */
static unsigned int kfifo_in_debug(struct kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int avail = kfifo_avail(fifo);
unsigned int ret;
if (len > avail) {
pr_warn("kfifo: 오버플로우 시도! 요청=%u 가용=%u\n",
len, avail);
dump_stack(); /* 호출 스택 출력 */
}
ret = kfifo_in(fifo, buf, len);
if (ret != len)
pr_warn("kfifo: 부분 삽입 %u/%u\n", ret, len);
return ret;
}
pr_debug()는 CONFIG_DYNAMIC_DEBUG가 설정되면 런타임에 켜고 끌 수 있으므로,
pr_info()나 printk() 대신 pr_debug()를 사용하세요.
echo 'module my_driver +p' > /sys/kernel/debug/dynamic_debug/control로 활성화합니다.
kfifo vs 대안 비교
커널에는 kfifo 외에도 여러 링 버퍼(Ring Buffer) 구현이 있습니다. 각각의 설계 목표와 사용 시나리오가 다르므로, 적절한 선택이 중요합니다.
주요 링 버퍼 비교
| 특성 | kfifo | ring_buffer (ftrace) | io_uring SQ/CQ | virtio vring |
|---|---|---|---|---|
| 주 용도 | 범용 데이터 전달 | 이벤트 트레이싱 | 비동기 I/O | 가상화(Virtualization) I/O |
| 동시성 | SPSC lock-free | MPSC lock-free | SPSC lock-free | SPSC/MPSC |
| 요소 크기 | 고정/가변 (레코드 모드) | 가변 (이벤트 헤더) | 고정 (SQE/CQE) | 고정 (descriptor) |
| 메모리 공유 | 커널 내부 | 커널→유저 (mmap) | 커널↔유저 (mmap) | 호스트↔게스트 |
| 오버플로우 처리 | 삽입 거부 (부분 삽입) | 오래된 데이터 덮어쓰기 | 삽입 거부 | 삽입 거부 |
| 크기 제약 | 2의 거듭제곱 | 페이지(Page) 정렬 | 2의 거듭제곱 | 2의 거듭제곱 |
| DMA 지원 | scatter/gather API | 없음 | 없음 | scatter/gather |
| 복잡도 | 낮음 (~400줄) | 높음 (~3000줄) | 중간 (~2000줄) | 중간 (~1500줄) |
| 헤더 파일 | linux/kfifo.h |
linux/ring_buffer.h |
linux/io_uring.h |
linux/virtio_ring.h |
ring_buffer의 MPSC 설계에서 유사한 아이디어가 사용됩니다.
소스 코드 워크스루
lib/kfifo.c와 include/linux/kfifo.h의 핵심 함수들을 호출 순서대로 분석합니다.
전체 소스는 약 400줄로, 간결하면서도 견고한 구현입니다.
주요 함수 호출 그래프
__kfifo_alloc() 상세
/* lib/kfifo.c */
int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
size_t esize, gfp_t gfp_mask)
{
/*
* 크기를 2의 거듭제곱으로 반올림
* 예: size=6 → 8, size=9 → 16
*/
size = roundup_pow_of_two(size);
fifo->in = 0;
fifo->out = 0;
fifo->esize = esize; /* 요소 크기 (바이트) */
if (size < 2) {
/* 최소 크기 2: in != out 조건으로 empty/full 구별 */
fifo->data = NULL;
fifo->mask = 0;
return -EINVAL;
}
fifo->data = kmalloc_array(esize, size, gfp_mask);
if (!fifo->data) {
fifo->mask = 0;
return -ENOMEM;
}
fifo->mask = size - 1; /* 모든 인덱싱에 사용될 마스크 */
return 0;
}
memcpy 분할 복사 상세
원형 버퍼에서 wrap이 발생하면 데이터가 버퍼 끝과 처음에 나뉩니다. kfifo_copy_in()과 kfifo_copy_out()은 이를 최대 2번의 memcpy()로 처리합니다.
/* lib/kfifo.c - kfifo_copy_in 분석 */
static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1; /* 전체 버퍼 크기 */
unsigned int esize = fifo->esize; /* 요소 크기 */
unsigned int l;
off &= fifo->mask; /* 실제 배열 인덱스: off mod size */
/* 바이트 단위로 변환 (esize가 1이 아닌 경우) */
if (esize != 0) {
off *= esize;
size *= esize;
len *= esize;
}
/*
* l = 현재 위치(off)에서 버퍼 끝까지 복사 가능한 바이트 수
*
* 예: size=64, off=50, len=20
* l = min(20, 64-50) = min(20, 14) = 14
* 1차 memcpy: buf[50..63] ← src[0..13] (14바이트)
* 2차 memcpy: buf[0..5] ← src[14..19] (6바이트)
*
* 예: size=64, off=10, len=20
* l = min(20, 64-10) = min(20, 54) = 20
* 1차 memcpy: buf[10..29] ← src[0..19] (20바이트)
* 2차 memcpy: len-l = 0 → no-op
*/
l = min(len, size - off);
memcpy(fifo->data + off, src, l); /* 1차: 끝까지 */
memcpy(fifo->data, src + l, len - l); /* 2차: 처음부터 */
}
/* kfifo_copy_out도 동일한 분할 로직 (방향만 반대) */
static void kfifo_copy_out(struct __kfifo *fifo, void *dst,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask;
if (esize != 0) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off);
memcpy(dst, fifo->data + off, l); /* 1차: 끝까지 */
memcpy(dst + l, fifo->data, len - l); /* 2차: 처음부터 */
}
/* __kfifo_free */
void __kfifo_free(struct __kfifo *fifo)
{
kfree(fifo->data);
fifo->in = 0;
fifo->out = 0;
fifo->esize = 0;
fifo->data = NULL;
fifo->mask = 0;
}
memcpy()는 길이 0으로 호출되어
컴파일러가 최적화합니다. 즉, 일반적인 경우 오버헤드 없이 단일 memcpy()만 실행됩니다.
GCC와 Clang 모두 길이 0인 memcpy()를 완전히 제거합니다.
include/linux/kfifo.h: 매크로(Macro), 인라인 함수(Inline Function), 타입 안전 래퍼 (약 700줄)lib/kfifo.c: 핵심 구현 함수 (약 400줄)samples/kfifo/: 커널 내장 예제 코드 (bytestream, inttype, record)
git log --oneline lib/kfifo.c로 변경 이력을 확인할 수 있습니다.
실전 디바이스 드라이버 활용
kfifo는 커널 전반의 다양한 서브시스템에서 비동기 데이터 버퍼로 활용됩니다. 실제 커널 소스에서 kfifo를 사용하는 대표적인 패턴들을 서브시스템별로 살펴봅니다.
UART/시리얼 드라이버의 TX/RX 버퍼
drivers/tty/serial/ 디렉터리의 시리얼 드라이버들은 kfifo를 송수신 버퍼로 사용합니다.
대표적으로 serial_core 프레임워크에서 uart_state 구조체가 TX 원형 버퍼를 관리하며,
8250 드라이버는 이를 기반으로 UART FIFO와 kfifo 사이에서 데이터를 교환합니다.
/* drivers/tty/serial/serial_core.c — TX 버퍼 초기화 */
static int uart_port_startup(struct tty_struct *tty,
struct uart_state *state, int init_hw)
{
struct uart_port *uport = uart_port_check(state);
unsigned long page;
int retval = 0;
/* TX 원형 버퍼용 페이지 할당 */
page = get_zeroed_page(GFP_KERNEL);
if (!page)
return -ENOMEM;
/* circ_buf 구조체로 kfifo와 동일한 원형 버퍼 패턴 사용 */
state->xmit.buf = (unsigned char *)page;
uart_circ_clear(&state->xmit);
retval = uport->ops->startup(uport);
return retval;
}
코드 설명
serial_core.c의 TX 버퍼 초기화 과정입니다.
- get_zeroed_page4KB 페이지를 0으로 초기화하여 할당합니다. TX 버퍼 크기가 PAGE_SIZE(4096)로 고정됩니다.
- circ_buf
struct circ_buf는 kfifo의 전신으로, head/tail 포인터와 buf 배열로 구성된 단순 원형 버퍼입니다. - uart_circ_clearhead와 tail을 0으로 초기화하여 빈 버퍼 상태를 만듭니다.
/* drivers/tty/serial/8250/8250_port.c — TX FIFO 채우기 */
static void serial8250_tx_chars(struct uart_8250_port *up)
{
struct uart_port *port = &up->port;
struct circ_buf *xmit = &port->state->xmit;
int count;
/* x_char(XON/XOFF)가 있으면 우선 전송 */
if (port->x_char) {
serial_out(up, UART_TX, port->x_char);
port->x_char = 0;
return;
}
/* TX 버퍼가 비었거나 전송 중지 상태이면 인터럽트 해제 */
if (uart_circ_empty(xmit) || uart_tx_stopped(port)) {
serial8250_stop_tx(port);
return;
}
count = up->tx_loadsz; /* UART FIFO 크기 (보통 16바이트) */
do {
/* 원형 버퍼에서 한 바이트씩 UART TX 레지스터에 쓰기 */
serial_out(up, UART_TX, xmit->buf[xmit->tail]);
xmit->tail = (xmit->tail + 1) & (UART_XMIT_SIZE - 1);
port->icount.tx++;
if (uart_circ_empty(xmit))
break;
} while (--count > 0);
/* 버퍼 여유 공간이 충분하면 상위 레이어에 통보 */
if (uart_circ_chars_pending(xmit) < WAKEUP_CHARS)
uart_write_wakeup(port);
}
코드 설명
8250 UART 드라이버의 TX 인터럽트 핸들러로, 원형 버퍼에서 데이터를 꺼내 UART 하드웨어 FIFO에 전송합니다.
- tx_loadszUART 하드웨어 FIFO 크기입니다. 16550A 호환 UART는 16바이트입니다.
- xmit->tail읽기 포인터를
(tail + 1) & (SIZE - 1)로 갱신합니다. kfifo의out & mask와 동일한 2의 거듭제곱 마스킹 기법입니다. - WAKEUP_CHARS버퍼 여유 공간이 이 임계값 이하로 떨어지면 TTY 레이어에 wake-up 신호를 보냅니다.
입력 서브시스템의 이벤트 버퍼링
입력 서브시스템(drivers/input/)에서는 input_event 구조체를 kfifo로 버퍼링하여
사용자 공간의 /dev/input/eventN 디바이스에 전달합니다.
evdev.c에서 클라이언트별 이벤트 큐가 kfifo로 구현되어 있습니다.
/* drivers/input/evdev.c — 클라이언트별 이벤트 버퍼 */
struct evdev_client {
unsigned int head; /* 쓰기 인덱스 */
unsigned int tail; /* 읽기 인덱스 */
unsigned int packet_head; /* SYN_REPORT 패킷 경계 */
unsigned int bufsize; /* 2의 거듭제곱 크기 */
struct input_event buffer[]; /* flexible array (kfifo 패턴) */
};
/* 이벤트 삽입 (IRQ 컨텍스트에서 호출) */
static void __evdev_queue_syn_dropped(struct evdev_client *client)
{
struct input_event ev;
ev.input_event_sec = 0;
ev.input_event_usec = 0;
ev.type = EV_SYN;
ev.code = SYN_DROPPED;
ev.value = 0;
/* head 인덱스를 마스크로 wrap → kfifo의 in & mask와 동일 */
client->buffer[client->head++ & (client->bufsize - 1)] = ev;
}
/* 이벤트 읽기 (사용자 공간 read()) */
static int evdev_fetch_next_event(struct evdev_client *client,
struct input_event *event)
{
if (client->packet_head == client->tail)
return 0;
spin_lock_irq(&client->buffer_lock);
*event = client->buffer[client->tail++ & (client->bufsize - 1)];
spin_unlock_irq(&client->buffer_lock);
return 1;
}
코드 설명
evdev.c는 kfifo와 동일한 원형 버퍼 패턴을 직접 구현하여 이벤트를 큐잉합니다.
- head++ & (bufsize - 1)kfifo의
in & mask와 동일한 래핑 기법입니다. bufsize는 2의 거듭제곱이어야 합니다. - packet_headSYN_REPORT 이벤트 경계를 추적합니다. 부분 패킷이 사용자 공간으로 전달되는 것을 방지합니다.
- buffer_lockevdev는 다중 스레드 읽기를 허용하므로 spinlock이 필요합니다. SPSC라면 lock-free가 가능합니다.
USB gadget 드라이버의 요청 큐잉
USB gadget 드라이버에서는 USB 요청(request)을 kfifo로 큐잉하여 비동기 전송을 관리합니다.
drivers/usb/gadget/function/ 디렉터리의 함수 드라이버들이 이 패턴을 사용합니다.
/* drivers/usb/gadget/function/f_serial.c — USB 시리얼 가젯 패턴 */
struct f_serial {
struct usb_function function;
struct usb_ep *ep_in;
struct usb_ep *ep_out;
DECLARE_KFIFO_PTR(tx_fifo, unsigned char);
DECLARE_KFIFO_PTR(rx_fifo, unsigned char);
struct list_head tx_reqs; /* 빈 TX 요청 풀 */
struct list_head rx_reqs; /* 빈 RX 요청 풀 */
spinlock_t lock;
};
/* TTY에서 USB 호스트로 TX 전송 */
static int gs_start_tx(struct f_serial *serial)
{
struct usb_request *req;
unsigned int len;
int status;
while (!list_empty(&serial->tx_reqs)) {
if (kfifo_len(&serial->tx_fifo) == 0)
break;
req = list_first_entry(&serial->tx_reqs,
struct usb_request, list);
/* kfifo에서 USB 요청 버퍼로 복사 */
len = kfifo_out(&serial->tx_fifo, req->buf, req->length);
req->length = len;
list_del(&req->list);
status = usb_ep_queue(serial->ep_in, req, GFP_ATOMIC);
if (status) {
list_add(&req->list, &serial->tx_reqs);
break;
}
}
return 0;
}
오디오 드라이버(ALSA)의 PCM 버퍼 관리
ALSA 오디오 드라이버에서는 PCM 데이터를 원형 버퍼로 관리합니다.
sound/core/pcm_lib.c의 링 버퍼는 kfifo와 동일한 원리로 동작하며,
하드웨어 포인터(hw_ptr)와 애플리케이션 포인터(appl_ptr)가 kfifo의 in/out 역할을 합니다.
/* sound/core/pcm_lib.c — PCM 링 버퍼 포인터 관리 */
static int snd_pcm_update_hw_ptr0(struct snd_pcm_substream *substream,
unsigned int in_interrupt)
{
struct snd_pcm_runtime *runtime = substream->runtime;
snd_pcm_uframes_t pos;
snd_pcm_sframes_t hdelta, delta;
/* 하드웨어 포인터 읽기 (DMA 컨트롤러의 현재 위치) */
pos = substream->ops->pointer(substream);
/* 원형 버퍼 래핑 처리 — kfifo의 in & mask와 동일한 원리 */
pos %= runtime->buffer_size;
/* 이전 hw_ptr과의 차이 계산 */
hdelta = pos - runtime->hw_ptr_base;
if (hdelta < 0)
hdelta += runtime->buffer_size; /* wrap-around 보정 */
/* 경과 프레임 수 = hw_ptr 진행량 */
delta = hdelta - runtime->hw_ptr_interrupt;
if (delta < 0)
delta += runtime->buffer_size;
runtime->hw_ptr_base = pos;
return 0;
}
/* ALSA PCM 버퍼 구조 — kfifo 패턴과의 대응 */
/*
* PCM 링 버퍼: kfifo 대응:
* ───────────── ──────────
* appl_ptr (응용) ←→ fifo.in (생산자)
* hw_ptr (하드웨어) ←→ fifo.out (소비자)
* buffer_size ←→ fifo.mask + 1
* period_size ←→ (해당 없음, 인터럽트 주기)
*/
DMA 연동 패턴
kfifo는 DMA 엔진과의 통합을 위해 전용 헬퍼 함수를 제공합니다. 원형 버퍼의 데이터가 물리적으로 연속되지 않을 수 있으므로(wrap-around), scatter-gather DMA를 통해 분할된 영역을 효율적으로 전송합니다.
DMA 헬퍼 API
kfifo의 DMA 관련 API는 prepare/finish 쌍으로 구성됩니다. prepare가 scatter-gather 리스트를 구성하고, finish가 in/out 포인터를 갱신합니다.
/* include/linux/kfifo.h — DMA 헬퍼 API */
/*
* kfifo_dma_in_prepare — DMA 수신용 scatterlist 준비
* @fifo: kfifo 포인터
* @sgl: scatterlist 배열
* @nents: scatterlist 엔트리 수 (최소 2)
* @len: 전송할 바이트 수
*
* 반환: 사용된 scatterlist 엔트리 수 (1 또는 2)
*/
#define kfifo_dma_in_prepare(fifo, sgl, nents, len) \
__kfifo_dma_in_prepare((fifo), sgl, nents, len)
/*
* kfifo_dma_in_finish — DMA 수신 완료 후 in 포인터 갱신
* @fifo: kfifo 포인터
* @len: 실제 수신된 바이트 수
*/
#define kfifo_dma_in_finish(fifo, len) \
__kfifo_dma_in_finish((fifo), len)
/*
* kfifo_dma_out_prepare — DMA 송신용 scatterlist 준비
* @fifo: kfifo 포인터
* @sgl: scatterlist 배열
* @nents: scatterlist 엔트리 수 (최소 2)
* @len: 전송할 바이트 수
*/
#define kfifo_dma_out_prepare(fifo, sgl, nents, len) \
__kfifo_dma_out_prepare((fifo), sgl, nents, len)
/*
* kfifo_dma_out_finish — DMA 송신 완료 후 out 포인터 갱신
* @fifo: kfifo 포인터
* @len: 실제 송신된 바이트 수
*/
#define kfifo_dma_out_finish(fifo, len) \
__kfifo_dma_out_finish((fifo), len)
DMA prepare 내부 구현
/* lib/kfifo.c — __kfifo_dma_in_prepare 내부 */
unsigned int __kfifo_dma_in_prepare_r(struct __kfifo *fifo,
struct scatterlist *sgl, int nents, unsigned int len, size_t recsize)
{
unsigned int size = fifo->mask + 1;
unsigned int off = fifo->in & fifo->mask; /* 물리 오프셋 */
unsigned int avail = kfifo_avail(fifo);
unsigned int l;
if (len > avail)
len = avail;
sg_init_table(sgl, nents);
/* 첫 번째 세그먼트: off부터 버퍼 끝까지 */
l = min(len, size - off);
sg_set_buf(sgl, fifo->data + off, l);
/* 두 번째 세그먼트: 버퍼 시작부터 나머지 */
if (len > l) {
sg_set_buf(sgl + 1, fifo->data, len - l);
return 2; /* scatter-gather 2개 엔트리 사용 */
}
sg_mark_end(sgl);
return 1; /* 연속 영역, 1개 엔트리로 충분 */
}
코드 설명
DMA 수신 준비 함수의 핵심 로직입니다. 원형 버퍼의 빈 공간을 scatterlist로 매핑합니다.
- off = fifo->in & fifo->mask현재 쓰기 위치의 물리 오프셋을 계산합니다. mask 연산으로 버퍼 크기 내로 wrap됩니다.
- l = min(len, size - off)첫 번째 세그먼트 길이입니다. off부터 버퍼 끝까지의 연속 공간과 요청 길이 중 작은 값입니다.
- sg_set_bufscatterlist 엔트리에 가상 주소와 길이를 설정합니다. DMA 매핑 시 물리 주소로 변환됩니다.
- return 2wrap-around가 발생하여 2개의 불연속 영역이 필요합니다. DMA 컨트롤러가 scatter-gather로 처리합니다.
DMA 연동 완전 예제
/* DMA 기반 시리얼 수신 예제 */
struct my_dma_serial {
struct dma_chan *rx_chan;
DECLARE_KFIFO(rx_fifo, u8, 4096);
struct scatterlist rx_sgl[2];
dma_cookie_t rx_cookie;
};
/* DMA 수신 시작 */
static int start_dma_rx(struct my_dma_serial *dev)
{
struct dma_async_tx_descriptor *desc;
unsigned int nents;
int ret;
/* 1단계: kfifo 빈 영역을 scatterlist로 준비 */
nents = kfifo_dma_in_prepare(&dev->rx_fifo,
dev->rx_sgl, 2, 256);
if (!nents)
return -ENOSPC;
/* 2단계: DMA 매핑 (가상 → 물리 주소 변환) */
ret = dma_map_sg(dev->rx_chan->device->dev,
dev->rx_sgl, nents, DMA_FROM_DEVICE);
if (!ret)
return -ENOMEM;
/* 3단계: DMA 디스크립터 생성 및 전송 시작 */
desc = dmaengine_prep_slave_sg(dev->rx_chan,
dev->rx_sgl, nents,
DMA_DEV_TO_MEM,
DMA_PREP_INTERRUPT);
desc->callback = dma_rx_complete;
desc->callback_param = dev;
dev->rx_cookie = dmaengine_submit(desc);
dma_async_issue_pending(dev->rx_chan);
return 0;
}
/* DMA 수신 완료 콜백 */
static void dma_rx_complete(void *param)
{
struct my_dma_serial *dev = param;
struct dma_tx_state state;
unsigned int received;
/* DMA 전송 상태 확인 */
dmaengine_tx_status(dev->rx_chan, dev->rx_cookie, &state);
received = 256 - state.residue;
/* DMA 매핑 해제 (캐시 무효화 포함) */
dma_unmap_sg(dev->rx_chan->device->dev,
dev->rx_sgl, 2, DMA_FROM_DEVICE);
/* 4단계: kfifo in 포인터 갱신 */
kfifo_dma_in_finish(&dev->rx_fifo, received);
/* 다음 DMA 전송 시작 */
start_dma_rx(dev);
}
코드 설명
kfifo DMA 연동의 4단계 패턴입니다: prepare → map → transfer → finish.
- kfifo_dma_in_preparekfifo의 빈 영역을 scatterlist로 변환합니다. wrap-around 시 2개, 연속 시 1개 엔트리를 생성합니다.
- dma_map_sgscatterlist의 가상 주소를 DMA 가능한 물리 주소로 매핑합니다. 캐시 일관성도 처리합니다.
- DMA_PREP_INTERRUPTDMA 전송 완료 시 인터럽트를 발생시키도록 요청합니다. 콜백이 이 인터럽트에서 호출됩니다.
- dma_unmap_sgDMA 매핑을 해제합니다. DMA_FROM_DEVICE이므로 CPU 캐시 무효화(invalidate)를 수행하여 캐시 일관성을 보장합니다.
- kfifo_dma_in_finish실제 수신된 바이트 수만큼 fifo->in 포인터를 전진시킵니다. smp_wmb()가 포함되어 있습니다.
DMA 캐시 일관성과 주의사항
DMA 캐시 일관성 규칙:
- DMA_TO_DEVICE (kfifo → 장치):
dma_map_sg()전에 kfifo 데이터가 완전히 기록되어야 합니다. 매핑 후 CPU가 데이터를 수정하면 안 됩니다. - DMA_FROM_DEVICE (장치 → kfifo):
dma_unmap_sg()후에만 CPU가 수신 데이터를 읽어야 합니다. 매핑 중 CPU 읽기는 stale 데이터를 반환합니다. - 정렬 요구사항: 일부 DMA 컨트롤러는 버스 폭(4/8/16바이트) 정렬을 요구합니다. kfifo 버퍼는
kmalloc()으로 할당되므로 자연 정렬이 보장됩니다. - kfifo_dma_in_finish/out_finish는 반드시
dma_unmap_sg()후에 호출해야 합니다.
/* scatter-gather DMA 출력 패턴 */
static int start_dma_tx(struct my_dma_serial *dev)
{
struct scatterlist tx_sgl[2];
struct dma_async_tx_descriptor *desc;
unsigned int nents, len;
len = kfifo_len(&dev->tx_fifo);
if (!len)
return 0;
/* kfifo 데이터 영역을 scatterlist로 준비 */
nents = kfifo_dma_out_prepare(&dev->tx_fifo,
tx_sgl, 2, len);
/* DMA 매핑 — CPU가 기록한 데이터를 flush */
dma_map_sg(dev->tx_chan->device->dev,
tx_sgl, nents, DMA_TO_DEVICE);
desc = dmaengine_prep_slave_sg(dev->tx_chan,
tx_sgl, nents,
DMA_MEM_TO_DEV,
DMA_PREP_INTERRUPT);
desc->callback = dma_tx_complete;
desc->callback_param = dev;
dmaengine_submit(desc);
dma_async_issue_pending(dev->tx_chan);
return len;
}
static void dma_tx_complete(void *param)
{
struct my_dma_serial *dev = param;
dma_unmap_sg(dev->tx_chan->device->dev,
dev->tx_sgl, 2, DMA_TO_DEVICE);
/* out 포인터 갱신 — 전송된 데이터를 kfifo에서 제거 */
kfifo_dma_out_finish(&dev->tx_fifo, dev->last_tx_len);
}
kfifo 내부 구현 심층 분석
kfifo의 핵심 함수들을 소스 코드 레벨에서 한 줄씩 분석합니다.
lib/kfifo.c와 include/linux/kfifo.h의 실제 구현을 기반으로 각 함수의 동작 원리와
메모리 배리어 배치, 래핑 처리 로직을 심층적으로 살펴봅니다.
__kfifo_alloc 내부: kmalloc + 2의 거듭제곱 정렬
/* lib/kfifo.c — __kfifo_alloc 전체 구현 */
int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
size_t esize, gfp_t gfp_mask)
{
/*
* size를 2의 거듭제곱으로 올림 정렬합니다.
* 예: size=100 → 128, size=1000 → 1024
* 이 정렬이 mask 기반 인덱싱의 핵심 전제조건입니다.
*/
if (!is_power_of_2(size)) {
if (size > UINT_MAX / 2)
return -EINVAL;
size = roundup_pow_of_two(size);
}
fifo->in = 0;
fifo->out = 0;
fifo->esize = esize; /* 요소 크기 (바이트 스트림이면 0) */
if (esize < 2) {
/* 바이트 스트림 또는 1바이트 요소: 직접 kmalloc */
fifo->data = kmalloc(size, gfp_mask);
if (!fifo->data) {
fifo->mask = 0;
return -ENOMEM;
}
} else {
/* 구조체 요소: 오버플로 방지를 위해 kmalloc_array 사용 */
fifo->data = kmalloc_array(esize, size, gfp_mask);
if (!fifo->data) {
fifo->mask = 0;
return -ENOMEM;
}
}
/*
* mask = size - 1
* 예: size=1024 → mask=0x3FF (1023)
* 모든 인덱싱에서 modulo 대신 bitwise AND로 사용됩니다.
*/
fifo->mask = size - 1;
return 0;
}
코드 설명
kfifo 할당의 전체 과정입니다. 핵심은 2의 거듭제곱 정렬과 mask 설정입니다.
- roundup_pow_of_two
include/linux/log2.h에 정의된 매크로로, clz(count leading zeros) 명령어를 사용하여 O(1)에 다음 2의 거듭제곱을 계산합니다. 예: 100 → 128. - UINT_MAX / 2오버플로 방지 검사입니다. size가 UINT_MAX/2보다 크면 roundup 결과가 오버플로될 수 있습니다.
- esize < 2esize가 0(바이트 스트림)이면 mask가 요소 개수가 아닌 바이트 수를 나타냅니다. esize가 1이면 동일합니다.
- kmalloc_array
kmalloc(esize * size)와 달리 곱셈 오버플로를 검사합니다. 대형 구조체 배열 할당 시 안전합니다. - mask = size - 1이후 모든 인덱싱에서
index & mask로 modulo를 대체합니다. CPU의 DIV 명령어보다 AND가 훨씬 빠릅니다.
__kfifo_in: memcpy 분할 복사 로직
/* lib/kfifo.c — kfifo_copy_in 내부 */
static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1; /* 버퍼 총 크기 */
unsigned int esize = fifo->esize; /* 요소 크기 */
unsigned int l;
/* 물리 오프셋 계산 (esize 곱셈) */
off &= fifo->mask; /* 논리 인덱스 → 버퍼 내 오프셋 */
if (esize != 0) {
off *= esize; /* 바이트 오프셋으로 변환 */
size *= esize; /* 바이트 기준 총 크기 */
len *= esize; /* 바이트 기준 복사 길이 */
}
/*
* l = 버퍼 끝까지 남은 바이트 수와 복사할 길이 중 작은 값
* wrap이 없으면: l == len → 두 번째 memcpy는 길이 0
* wrap이 있으면: l < len → 두 번의 memcpy로 분할
*/
l = min(len, size - off);
memcpy(fifo->data + off, src, l); /* 1차: off부터 끝까지 */
memcpy(fifo->data, src + l, len - l); /* 2차: 처음부터 나머지 */
}
/* lib/kfifo.c — __kfifo_in */
unsigned int __kfifo_in(struct __kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int l;
/* 가용 공간보다 많이 쓸 수 없음 */
l = kfifo_unused(fifo);
if (len > l)
len = l;
/* 데이터 복사 (분할 처리 포함) */
kfifo_copy_in(fifo, buf, len, fifo->in);
/*
* smp_wmb() — 쓰기 메모리 배리어
*
* 목적: 데이터 복사(memcpy)가 in 포인터 갱신보다
* 먼저 다른 CPU에서 관찰되도록 보장합니다.
*
* 없으면: 소비자 CPU가 새 in 값을 보고 아직 복사되지 않은
* 데이터를 읽는 경쟁 상태가 발생할 수 있습니다.
*/
smp_wmb();
fifo->in += len; /* in은 절대값으로 계속 증가 (오버플로 OK) */
return len;
}
코드 설명
__kfifo_in의 데이터 삽입 과정입니다. 핵심은 분할 복사와 메모리 배리어입니다.
- off &= fifo->mask32비트 정수 in의 절대값을 버퍼 크기 내 오프셋으로 변환합니다. 오버플로되어도 mask AND 연산으로 올바른 위치를 계산합니다.
- l = min(len, size - off)버퍼 끝까지의 연속 공간입니다. 이 값이 len보다 작으면 wrap-around가 발생하여 2번의 memcpy가 필요합니다.
- memcpy 1차현재 오프셋에서 버퍼 끝까지 복사합니다. wrap이 없으면 이것만으로 전체 복사가 완료됩니다.
- memcpy 2차나머지 데이터를 버퍼 시작점부터 복사합니다.
len - l이 0이면 컴파일러가 이 호출을 제거합니다. - smp_wmb()쓰기 메모리 배리어입니다. ARM/POWER에서는 실제 배리어 명령어가 삽입되고, x86에서는 컴파일러 배리어만 됩니다(TSO 보장).
- fifo->in += lenin은 절대값으로 단조 증가합니다. 32비트 오버플로가 발생해도
in - out차이는 정확합니다(부호 없는 산술).
__kfifo_out: 읽기 포인터 업데이트와 smp_wmb
/* lib/kfifo.c — __kfifo_out */
unsigned int __kfifo_out(struct __kfifo *fifo,
void *buf, unsigned int len)
{
unsigned int l;
/* 현재 저장된 데이터보다 많이 읽을 수 없음 */
l = fifo->in - fifo->out;
if (len > l)
len = l;
/* 데이터 복사 (분할 처리 포함) */
kfifo_copy_out(fifo, buf, len, fifo->out);
/*
* smp_wmb() — 쓰기 메모리 배리어
*
* 목적: 데이터 복사가 out 포인터 갱신보다 먼저
* 완료되도록 보장합니다.
*
* 없으면: 생산자 CPU가 새 out 값을 보고 아직 읽기가
* 완료되지 않은 영역에 새 데이터를 덮어쓸 수 있습니다.
*/
smp_wmb();
fifo->out += len; /* out도 절대값으로 계속 증가 */
return len;
}
/* __kfifo_out_peek — 데이터를 읽되 out 포인터는 유지 */
unsigned int __kfifo_out_peek(struct __kfifo *fifo,
void *buf, unsigned int len)
{
unsigned int l;
l = fifo->in - fifo->out;
if (len > l)
len = l;
kfifo_copy_out(fifo, buf, len, fifo->out);
/* peek이므로 out 포인터를 갱신하지 않음 */
/* smp_wmb()도 불필요 — 상태가 변경되지 않으므로 */
return len;
}
코드 설명
데이터 추출 함수와 peek 함수의 차이를 비교합니다.
- fifo->in - fifo->out사용 가능한 데이터 양을 계산합니다. 부호 없는 뺄셈이므로 in이 오버플로되어도 정확합니다.
- kfifo_copy_out
kfifo_copy_in과 대칭적인 함수로, out 오프셋부터 분할 복사를 수행합니다. - __kfifo_out의 smp_wmb데이터 읽기 완료 후 out 갱신 전에 배리어를 삽입합니다. 생산자가 이전 데이터를 덮어쓰지 않도록 보호합니다.
- __kfifo_out_peekout 포인터를 변경하지 않으므로 메모리 배리어가 불필요합니다. 반복 호출해도 같은 데이터를 반환합니다.
레코드 기반 kfifo의 recsize 인코딩
kfifo의 레코드 모드는 가변 길이 데이터를 지원합니다. 각 레코드 앞에 1~2바이트의 길이 헤더를 붙여
레코드 경계를 관리합니다. DEFINE_KFIFO(name, type, size)에서 세 번째 인자가
레코드 크기를 결정합니다.
/* include/linux/kfifo.h — 레코드 크기 인코딩 */
/*
* recsize = 0: 고정 크기 요소 (일반 kfifo)
* recsize = 1: 레코드 길이 1바이트 헤더 (최대 255바이트)
* recsize = 2: 레코드 길이 2바이트 헤더 (최대 65535바이트)
*/
/* 레코드 헤더 기록 */
static void __kfifo_poke_n(struct __kfifo *fifo,
unsigned int n, unsigned int recsize)
{
if (recsize == 1) {
/* 1바이트 헤더: 직접 기록 */
*((u8 *)(fifo->data + (fifo->in & fifo->mask))) = (u8)n;
} else {
/* 2바이트 헤더: 리틀 엔디안(Little-Endian)으로 기록 */
*((u8 *)(fifo->data + (fifo->in & fifo->mask))) =
(u8)(n & 0xFF);
*((u8 *)(fifo->data + ((fifo->in + 1) & fifo->mask))) =
(u8)((n >> 8) & 0xFF);
}
}
/* 레코드 헤더 읽기 */
static unsigned int __kfifo_peek_n(struct __kfifo *fifo,
unsigned int recsize)
{
unsigned int l;
if (recsize == 1) {
l = *((u8 *)(fifo->data + (fifo->out & fifo->mask)));
} else {
l = *((u8 *)(fifo->data + (fifo->out & fifo->mask)));
l |= *((u8 *)(fifo->data + ((fifo->out + 1) & fifo->mask)))
<< 8;
}
return l;
}
/* 레코드 삽입: 헤더 + 데이터 */
unsigned int __kfifo_in_r(struct __kfifo *fifo,
const void *buf, unsigned int len,
size_t recsize)
{
if (len + recsize > kfifo_unused(fifo))
return 0;
/* 1단계: 레코드 길이 헤더 기록 */
__kfifo_poke_n(fifo, len, recsize);
/* 2단계: in 포인터를 recsize만큼 전진 (헤더 건너뛰기) */
kfifo_copy_in(fifo, buf, len, fifo->in + recsize);
smp_wmb();
/* 3단계: 헤더 + 데이터 길이만큼 in 갱신 */
fifo->in += len + recsize;
return len;
}
코드 설명
레코드 기반 kfifo는 가변 길이 메시지를 지원하기 위해 각 레코드 앞에 길이 헤더를 추가합니다.
- recsize == 11바이트 헤더로 최대 255바이트 레코드를 지원합니다. 소형 메시지에 적합합니다.
- recsize == 22바이트 리틀 엔디안 헤더로 최대 65535바이트 레코드를 지원합니다. 헤더 자체도 wrap-around될 수 있어 바이트 단위로 기록합니다.
- len + recsize가용 공간 검사 시 헤더 크기를 포함해야 합니다. 데이터만 검사하면 헤더가 기존 데이터를 덮어쓸 수 있습니다.
- fifo->in + recsize데이터 복사 시작 위치는 현재 in에서 헤더 크기를 건너뛴 위치입니다.
kfifo 성능 벤치마크와 튜닝
kfifo의 성능 특성을 정량적으로 분석하고, 최적의 성능을 얻기 위한 튜닝 가이드를 제시합니다. 실제 벤치마크 코드와 측정 결과를 기반으로 lock-free의 이점과 한계를 살펴봅니다.
kfifo 크기 선택 가이드
kfifo 크기는 2의 거듭제곱이어야 하며, 요구사항에 따라 적절한 크기를 선택해야 합니다.
/* kfifo 크기 선택 기준 */
/*
* 1. 최소 크기: 생산 속도 × 소비 지연 시간
* 예: UART 115200bps, read() 응답 최대 10ms
* → 115200/8/100 * 10 ≈ 144바이트 → 256 (2의 거듭제곱)
*
* 2. 최대 크기: 사용 가능한 메모리 제약
* - kmalloc: 최대 약 4MB (order 10 페이지)
* - vmalloc 불가: kfifo는 물리적 연속 메모리 필요
*
* 3. 캐시 효율 권장 크기
* - L1 캐시(32~64KB)에 들어가면 최적
* - 4KB (1 페이지)가 가장 일반적
*/
/* 크기별 권장 사용처 */
DEFINE_KFIFO(tiny_fifo, u8, 64); /* 제어 메시지, GPIO 이벤트 */
DEFINE_KFIFO(small_fifo, u8, 256); /* UART, I2C 버퍼 */
DEFINE_KFIFO(medium_fifo, u8, 4096); /* 시리얼 콘솔, 네트워크 */
DEFINE_KFIFO(large_fifo, u8, 65536); /* 오디오 스트림, 대용량 DMA */
/* 동적 크기 결정 (모듈 파라미터 기반) */
static unsigned int fifo_size = 1024;
module_param(fifo_size, uint, 0644);
static int __init my_init(void)
{
int ret;
/* kfifo_alloc이 자동으로 2의 거듭제곱으로 올림 */
ret = kfifo_alloc(&my_fifo, fifo_size, GFP_KERNEL);
if (ret)
return ret;
pr_info("kfifo allocated: requested=%u actual=%u\n",
fifo_size, kfifo_size(&my_fifo));
return 0;
}
lock-free 성능 vs spinlock 보호 성능 비교
/* 벤치마크 모듈: lock-free SPSC vs spinlock-protected kfifo */
#include <linux/kfifo.h>
#include <linux/kthread.h>
#include <linux/time.h>
#define BENCH_ITERATIONS 1000000
#define FIFO_SIZE 4096
#define ELEMENT_SIZE 64
static DEFINE_KFIFO(lockfree_fifo, u8, FIFO_SIZE);
static DEFINE_KFIFO(locked_fifo, u8, FIFO_SIZE);
static DEFINE_SPINLOCK(fifo_lock);
/* lock-free 생산자 (SPSC) */
static int producer_lockfree(void *data)
{
u8 buf[ELEMENT_SIZE];
ktime_t start, end;
int i;
start = ktime_get();
for (i = 0; i < BENCH_ITERATIONS; i++) {
while (kfifo_avail(&lockfree_fifo) < ELEMENT_SIZE)
cpu_relax();
kfifo_in(&lockfree_fifo, buf, ELEMENT_SIZE);
}
end = ktime_get();
pr_info("lock-free: %lld ns/op\n",
ktime_to_ns(ktime_sub(end, start)) / BENCH_ITERATIONS);
return 0;
}
/* spinlock 보호 생산자 (MPSC 안전) */
static int producer_locked(void *data)
{
u8 buf[ELEMENT_SIZE];
ktime_t start, end;
unsigned long flags;
int i;
start = ktime_get();
for (i = 0; i < BENCH_ITERATIONS; i++) {
spin_lock_irqsave(&fifo_lock, flags);
while (kfifo_avail(&locked_fifo) < ELEMENT_SIZE) {
spin_unlock_irqrestore(&fifo_lock, flags);
cpu_relax();
spin_lock_irqsave(&fifo_lock, flags);
}
kfifo_in(&locked_fifo, buf, ELEMENT_SIZE);
spin_unlock_irqrestore(&fifo_lock, flags);
}
end = ktime_get();
pr_info("spinlock: %lld ns/op\n",
ktime_to_ns(ktime_sub(end, start)) / BENCH_ITERATIONS);
return 0;
}
- lock-free SPSC: ~15-25 ns/op (캐시 히트 시)
- spinlock SPSC: ~45-80 ns/op (lock 오버헤드)
- spinlock MPSC (2코어 경합): ~120-300 ns/op (경합 시)
- 캐시 미스 시: 모든 경우 5~10배 성능 저하
lock-free의 핵심 이점은 경합 없이 다른 CPU의 캐시라인을 무효화하지 않는다는 점입니다.
캐시라인 정렬과 false sharing 방지
/* false sharing 방지를 위한 kfifo 래퍼 */
struct aligned_kfifo {
/* in과 out이 같은 캐시라인에 있으면 false sharing 발생 */
struct {
unsigned int in;
} ____cacheline_aligned_in_smp producer;
struct {
unsigned int out;
} ____cacheline_aligned_in_smp consumer;
unsigned int mask;
unsigned int esize;
void *data;
};
/*
* 표준 kfifo의 __kfifo 구조체:
*
* struct __kfifo {
* unsigned int in; ← 생산자가 수정
* unsigned int out; ← 소비자가 수정
* unsigned int mask; ← 읽기 전용
* unsigned int esize; ← 읽기 전용
* void *data; ← 읽기 전용
* };
*
* in과 out이 같은 캐시라인(64바이트)에 위치합니다.
* SPSC에서도 생산자가 in을 쓰면 소비자 CPU의
* 캐시라인이 무효화됩니다 (false sharing).
*
* 하지만 실제로 kfifo는 의도적으로 이를 허용합니다:
* - 구조체 크기를 최소화 (20~24바이트)
* - 소비자가 in을 읽어야 하므로 완전 분리 불가
* - 실측 결과 false sharing 오버헤드 < 10ns
*/
kfifo vs 다른 큐 구현 비교
| 구현 | 연산 시간 | 메모리 | 동기화 | 주요 용도 |
|---|---|---|---|---|
| kfifo | O(1), ~20ns | 연속 배열 2^n 바이트 | SPSC: lock-free | 바이트 스트림, 드라이버 |
| list_head | O(1), ~30ns | 노드별 16B 오버헤드 | spinlock 필수 | 범용 큐, workqueue |
| llist (lockless linked list) | O(1), ~25ns | 노드별 8B 오버헤드 | MPSC: lock-free (CAS) | per-CPU 작업 큐 |
| skb_queue | O(1), ~50ns | skb별 ~256B | spinlock 내장 | 네트워크 패킷 큐 |
| workqueue | O(1), ~100ns | work_struct 32B | per-CPU + pool | 지연 실행 태스크 |
kfifo가 최적인 경우:
- 고정 크기 바이트 스트림 (UART, 오디오)
- SPSC 패턴 (IRQ 핸들러 → 프로세스 컨텍스트)
- 높은 처리량, 낮은 지연 시간 요구
- DMA 연동 필요 (연속 메모리)
kfifo가 부적절한 경우:
- 가변 크기 객체 →
list_head또는 record kfifo - 다중 생산자 →
llist,spinlock+ kfifo - 우선순위 기반 스케줄링 →
rb_tree - 동적 크기 조절 필요 →
list_head
커널 모듈 완전 예제
kfifo를 사용하는 완전한 문자 디바이스 드라이버를 구현합니다.
/dev/myfifo 장치를 생성하고, write()로 데이터를 삽입하고
read()로 추출하며, poll()/select()를 지원합니다.
모듈 파라미터로 fifo 크기를 설정할 수 있습니다.
모듈 아키텍처
전체 소스 코드
/*
* myfifo.c — kfifo 기반 문자 디바이스 드라이버
*
* 기능:
* - /dev/myfifo 생성 (misc device)
* - write()로 데이터 enqueue
* - read()로 데이터 dequeue
* - poll/select 지원
* - ioctl로 상태 조회/리셋
* - 모듈 파라미터로 fifo 크기 설정
*/
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/kfifo.h>
#include <linux/miscdevice.h>
#include <linux/mutex.h>
#include <linux/poll.h>
#include <linux/wait.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
#define MYFIFO_IOC_MAGIC 'F'
#define MYFIFO_IOC_RESET _IO(MYFIFO_IOC_MAGIC, 0)
#define MYFIFO_IOC_AVAIL _IOR(MYFIFO_IOC_MAGIC, 1, unsigned int)
#define MYFIFO_IOC_USED _IOR(MYFIFO_IOC_MAGIC, 2, unsigned int)
/* 모듈 파라미터 */
static unsigned int fifo_size = 4096;
module_param(fifo_size, uint, 0444);
MODULE_PARM_DESC(fifo_size, "kfifo 버퍼 크기 (바이트, 2의 거듭제곱)");
/* 디바이스 상태 */
static struct {
struct mutex lock;
DECLARE_KFIFO_PTR(fifo, u8);
wait_queue_head_t read_wq;
wait_queue_head_t write_wq;
} myfifo_dev;
/* ── open ── */
static int myfifo_open(struct inode *inode, struct file *file)
{
return 0;
}
/* ── release ── */
static int myfifo_release(struct inode *inode, struct file *file)
{
return 0;
}
/* ── write: 사용자 → kfifo ── */
static ssize_t myfifo_write(struct file *file,
const char __user *buf,
size_t count, loff_t *ppos)
{
unsigned int copied;
int ret;
/* 비블로킹 모드: 공간이 없으면 즉시 반환 */
if ((file->f_flags & O_NONBLOCK) &&
kfifo_is_full(&myfifo_dev.fifo))
return -EAGAIN;
/* 블로킹 모드: 공간이 생길 때까지 대기 */
ret = wait_event_interruptible(myfifo_dev.write_wq,
!kfifo_is_full(&myfifo_dev.fifo));
if (ret)
return ret;
mutex_lock(&myfifo_dev.lock);
/* 사용자 공간에서 kfifo로 직접 복사 */
ret = kfifo_from_user(&myfifo_dev.fifo, buf, count, &copied);
if (ret) {
mutex_unlock(&myfifo_dev.lock);
return ret;
}
mutex_unlock(&myfifo_dev.lock);
/* 읽기 대기 중인 프로세스 깨우기 */
wake_up_interruptible(&myfifo_dev.read_wq);
return copied;
}
/* ── read: kfifo → 사용자 ── */
static ssize_t myfifo_read(struct file *file,
char __user *buf,
size_t count, loff_t *ppos)
{
unsigned int copied;
int ret;
/* 비블로킹 모드: 데이터가 없으면 즉시 반환 */
if ((file->f_flags & O_NONBLOCK) &&
kfifo_is_empty(&myfifo_dev.fifo))
return -EAGAIN;
/* 블로킹 모드: 데이터가 올 때까지 대기 */
ret = wait_event_interruptible(myfifo_dev.read_wq,
!kfifo_is_empty(&myfifo_dev.fifo));
if (ret)
return ret;
mutex_lock(&myfifo_dev.lock);
/* kfifo에서 사용자 공간으로 직접 복사 */
ret = kfifo_to_user(&myfifo_dev.fifo, buf, count, &copied);
if (ret) {
mutex_unlock(&myfifo_dev.lock);
return ret;
}
mutex_unlock(&myfifo_dev.lock);
/* 쓰기 대기 중인 프로세스 깨우기 */
wake_up_interruptible(&myfifo_dev.write_wq);
return copied;
}
/* ── poll: select/poll/epoll 지원 ── */
static __poll_t myfifo_poll(struct file *file,
struct poll_table_struct *wait)
{
__poll_t mask = 0;
/* 두 대기 큐 모두 등록 */
poll_wait(file, &myfifo_dev.read_wq, wait);
poll_wait(file, &myfifo_dev.write_wq, wait);
mutex_lock(&myfifo_dev.lock);
/* 데이터가 있으면 읽기 가능 */
if (!kfifo_is_empty(&myfifo_dev.fifo))
mask |= EPOLLIN | EPOLLRDNORM;
/* 여유 공간이 있으면 쓰기 가능 */
if (!kfifo_is_full(&myfifo_dev.fifo))
mask |= EPOLLOUT | EPOLLWRNORM;
mutex_unlock(&myfifo_dev.lock);
return mask;
}
/* ── ioctl: 제어 명령 ── */
static long myfifo_ioctl(struct file *file,
unsigned int cmd, unsigned long arg)
{
unsigned int val;
switch (cmd) {
case MYFIFO_IOC_RESET:
mutex_lock(&myfifo_dev.lock);
kfifo_reset(&myfifo_dev.fifo);
mutex_unlock(&myfifo_dev.lock);
wake_up_interruptible(&myfifo_dev.write_wq);
return 0;
case MYFIFO_IOC_AVAIL:
mutex_lock(&myfifo_dev.lock);
val = kfifo_avail(&myfifo_dev.fifo);
mutex_unlock(&myfifo_dev.lock);
return put_user(val, (unsigned int __user *)arg);
case MYFIFO_IOC_USED:
mutex_lock(&myfifo_dev.lock);
val = kfifo_len(&myfifo_dev.fifo);
mutex_unlock(&myfifo_dev.lock);
return put_user(val, (unsigned int __user *)arg);
default:
return -ENOTTY;
}
}
/* ── file_operations ── */
static const struct file_operations myfifo_fops = {
.owner = THIS_MODULE,
.open = myfifo_open,
.release = myfifo_release,
.read = myfifo_read,
.write = myfifo_write,
.poll = myfifo_poll,
.unlocked_ioctl = myfifo_ioctl,
};
/* ── misc device ── */
static struct miscdevice myfifo_misc = {
.minor = MISC_DYNAMIC_MINOR,
.name = "myfifo",
.fops = &myfifo_fops,
};
/* ── 모듈 초기화 ── */
static int __init myfifo_init(void)
{
int ret;
/* kfifo 할당 (자동 2의 거듭제곱 올림) */
ret = kfifo_alloc(&myfifo_dev.fifo, fifo_size, GFP_KERNEL);
if (ret) {
pr_err("myfifo: kfifo_alloc failed (%d)\n", ret);
return ret;
}
mutex_init(&myfifo_dev.lock);
init_waitqueue_head(&myfifo_dev.read_wq);
init_waitqueue_head(&myfifo_dev.write_wq);
/* misc device 등록 → /dev/myfifo 자동 생성 */
ret = misc_register(&myfifo_misc);
if (ret) {
pr_err("myfifo: misc_register failed (%d)\n", ret);
kfifo_free(&myfifo_dev.fifo);
return ret;
}
pr_info("myfifo: loaded (size=%u actual=%u)\n",
fifo_size, kfifo_size(&myfifo_dev.fifo));
return 0;
}
/* ── 모듈 해제 ── */
static void __exit myfifo_exit(void)
{
misc_deregister(&myfifo_misc);
kfifo_free(&myfifo_dev.fifo);
pr_info("myfifo: unloaded\n");
}
module_init(myfifo_init);
module_exit(myfifo_exit);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("Example");
MODULE_DESCRIPTION("kfifo-based character device driver");
코드 설명
완전한 kfifo 문자 디바이스 드라이버입니다. 핵심 설계 결정을 설명합니다.
- DECLARE_KFIFO_PTR동적 할당 kfifo를 선언합니다. 모듈 파라미터로 크기를 런타임에 결정할 수 있습니다.
- mutex다중 프로세스가 동시에 read/write할 수 있으므로 mutex로 보호합니다. SPSC가 아니므로 lock-free는 불가능합니다.
- wait_queue_head_t블로킹 I/O와 poll/select를 지원하기 위한 대기 큐입니다. 읽기/쓰기 각각 별도의 대기 큐를 사용합니다.
- kfifo_from_user
copy_from_user()+kfifo_in()을 하나로 합친 함수입니다. 사용자 공간 버퍼에서 직접 kfifo로 복사합니다. - kfifo_to_user
kfifo_out()+copy_to_user()를 하나로 합친 함수입니다. kfifo에서 사용자 공간 버퍼로 직접 복사합니다. - poll_wait대기 큐를 poll 테이블에 등록합니다. 실제 대기는 하지 않고, 이벤트 발생 시 wake_up으로 깨웁니다.
- misc_registermisc 디바이스 프레임워크를 사용하면 major 번호 할당, device 생성 등을 자동으로 처리합니다.
- O_NONBLOCK비블로킹 모드에서는 데이터/공간이 없으면 -EAGAIN을 즉시 반환합니다. epoll 기반 이벤트 루프에서 필수입니다.
Makefile 및 테스트
# Makefile
obj-m += myfifo.o
KDIR ?= /lib/modules/$(shell uname -r)/build
all:
make -C $(KDIR) M=$(PWD) modules
clean:
make -C $(KDIR) M=$(PWD) clean
# 빌드 및 로드
make
sudo insmod myfifo.ko fifo_size=8192
dmesg | tail -1
# myfifo: loaded (size=8192 actual=8192)
# 기본 테스트
echo "Hello kfifo" > /dev/myfifo
cat /dev/myfifo
# Hello kfifo
# 비블로킹 읽기 (데이터 없을 때)
python3 -c "
import os, fcntl
fd = os.open('/dev/myfifo', os.O_RDONLY | os.O_NONBLOCK)
try:
data = os.read(fd, 100)
except OSError as e:
print(f'Expected EAGAIN: {e}')
os.close(fd)
"
# 동시 생산자-소비자 테스트
(
# 생산자: 1초마다 데이터 전송
for i in $(seq 1 5); do
echo "message-$i" > /dev/myfifo
sleep 1
done
) &
# 소비자: poll로 대기하며 읽기
timeout 6 cat /dev/myfifo
# 모듈 제거
sudo rmmod myfifo
poll/select 테스트 프로그램
/* test_poll.c — poll()을 사용한 myfifo 테스트 */
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <poll.h>
#include <string.h>
int main(void)
{
struct pollfd pfd;
char buf[256];
int ret;
pfd.fd = open("/dev/myfifo", O_RDWR | O_NONBLOCK);
if (pfd.fd < 0) {
perror("open");
return 1;
}
/* 쓰기 가능 확인 후 데이터 전송 */
pfd.events = POLLOUT;
ret = poll(&pfd, 1, 1000);
if (ret > 0 && (pfd.revents & POLLOUT)) {
write(pfd.fd, "test data\n", 10);
printf("Wrote 10 bytes\n");
}
/* 읽기 가능 확인 후 데이터 수신 */
pfd.events = POLLIN;
ret = poll(&pfd, 1, 1000);
if (ret > 0 && (pfd.revents & POLLIN)) {
ret = read(pfd.fd, buf, sizeof(buf) - 1);
buf[ret] = '\0';
printf("Read: %s", buf);
}
close(pfd.fd);
return 0;
}
- 다중 인스턴스:
container_of()와file->private_data를 사용하여 디바이스 파일별로 독립된 kfifo 인스턴스를 할당할 수 있습니다. - procfs/sysfs:
/proc/myfifo_stats또는/sys/class/misc/myfifo/에 통계 정보(사용량, 오버런 횟수 등)를 노출할 수 있습니다. - mmap 지원: kfifo 버퍼를
remap_pfn_range()로 사용자 공간에 매핑하면 시스템 콜 없이 zero-copy 접근이 가능합니다. - 타이머 연동:
hrtimer와 결합하여 주기적으로 kfifo를 flush하는 패턴으로 batching 최적화를 구현할 수 있습니다.
kfifo_in/kfifo_out 단계별 시각화
kfifo의 핵심 동작은 kfifo_in()과 kfifo_out()이 각각 in과 out 포인터를 전진시키며 데이터를 복사하는 것입니다. 이 절에서는 8슬롯(크기 8) 원형 버퍼를 사용하여 데이터 삽입과 추출 과정을 단계별로 시각화합니다.
in은 다음에 쓸 위치, out은 다음에 읽을 위치를 가리킵니다. 실제 인덱스는 in & (size - 1)로 계산하며, 포인터 자체는 단조 증가합니다.
아래 SVG는 초기 상태에서 데이터 3개를 삽입하고 2개를 추출한 뒤, 다시 6개를 삽입하여 래핑(wrapping)이 발생하는 전체 과정을 보여줍니다.
래핑이 발생하면 __kfifo_in() 내부에서 memcpy()가 두 번 호출됩니다. 첫 번째 복사는 현재 in 위치부터 버퍼 끝까지, 두 번째 복사는 버퍼 시작부터 나머지 데이터를 채웁니다.
아래 SVG는 __kfifo_in()의 내부 분할 복사 로직을 단계별로 보여줍니다.
/* include/linux/kfifo.h — __kfifo_in() 핵심 로직 (v6.x) */
static inline unsigned int __kfifo_in(
struct __kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int l;
len = min(len, kfifo_unused(fifo)); /* 남은 공간만큼만 복사 */
/*
* smp_mb() 는 불필요합니다.
* 데이터 쓰기가 in 포인터 갱신보다 먼저 보여야 하므로
* smp_wmb()만 필요합니다.
*/
/* off = in의 실제 버퍼 인덱스 */
l = kfifo_unused(fifo);
l = min(len, fifo->mask + 1 - (fifo->in & fifo->mask));
memcpy(fifo->data + (fifo->in & fifo->mask), buf, l);
memcpy(fifo->data, buf + l, len - l);
/*
* smp_wmb(): 데이터가 확실히 기록된 후에만
* consumer가 새 in 값을 볼 수 있도록 보장합니다.
*/
smp_wmb();
fifo->in += len;
return len;
}
/* include/linux/kfifo.h — __kfifo_out() 핵심 로직 (v6.x) */
static inline unsigned int __kfifo_out(
struct __kfifo *fifo,
void *buf, unsigned int len)
{
unsigned int l;
len = min(len, fifo->in - fifo->out); /* 가용 데이터만큼 */
/*
* smp_rmb(): out을 읽기 전에 in의 최신 값을 확인합니다.
* producer가 기록한 데이터를 확실히 볼 수 있도록 보장합니다.
*/
smp_rmb();
l = min(len, fifo->mask + 1 - (fifo->out & fifo->mask));
memcpy(buf, fifo->data + (fifo->out & fifo->mask), l);
memcpy(buf + l, fifo->data, len - l);
/*
* smp_mb(): out 포인터 갱신 전에 데이터 읽기가
* 완료되었음을 보장합니다.
*/
smp_mb();
fifo->out += len;
return len;
}
kfifo_out()은 데이터를 추출하면서 out 포인터를 전진시킵니다. 반면 kfifo_out_peek()은 데이터를 읽되 out을 변경하지 않으므로, 확인 후 폐기할지 결정하는 패턴에 적합합니다.
래핑의 수학적 원리
래핑이 발생할 때 분할 복사가 필요한 이유를 수식으로 정리합니다. 버퍼 크기를 S(2의 거듭제곱), 현재 in의 실제 인덱스를 off = in & (S-1), 삽입할 데이터 길이를 len이라 하면:
off + len ≤ S이면 래핑이 발생하지 않습니다.memcpy(buf + off, src, len)한 번으로 충분합니다.off + len > S이면 래핑이 발생합니다. 첫 번째 복사는l = S - off바이트, 두 번째 복사는len - l바이트입니다.
kfifo의 영리한 설계 포인트는 len - l이 0인 경우(래핑 미발생)에도 두 번째 memcpy()가 호출되는 것입니다. 길이가 0인 memcpy()는 아무 작업도 하지 않으므로(no-op), 조건 분기 없이 항상 같은 코드 경로를 실행합니다. 이는 분기 예측 미스를 방지하여 고속 경로의 성능을 최적화합니다.
kfifo_out 래핑 패턴
kfifo_out()도 동일한 분할 패턴을 사용합니다. out 포인터의 실제 인덱스(out & mask)에서 버퍼 끝까지 읽고, 나머지를 버퍼 시작부터 읽습니다. 이 대칭적 구조 덕분에 __kfifo_in()과 __kfifo_out()의 핵심 로직이 거의 동일합니다.
성능 관점에서, 래핑이 발생하는 빈도는 버퍼 크기 대비 평균 삽입/추출 크기에 따라 결정됩니다. 버퍼 크기가 충분히 크면 래핑은 드물게 발생하며, 이때 두 번째 memcpy()의 no-op 최적화가 효과적으로 작동합니다. 반대로 작은 버퍼에 큰 데이터를 반복 삽입하면 매번 래핑이 발생하여 두 번의 memcpy()가 실제 복사를 수행합니다.
Lock-free 정확성 증명
kfifo가 단일 producer/단일 consumer(SPSC) 환경에서 lock 없이 정확하게 동작하는 이유를 형식적으로 분석합니다. 핵심은 세 가지 불변식(invariant)이 항상 유지되는 점입니다.
불변식(Invariant) 정의
- INV-1 (단조 증가):
in과out은 절대 감소하지 않습니다. 각각 자신의 owner(producer/consumer)만 증가시킵니다. - INV-2 (범위 제약):
0 ≤ in - out ≤ size가 항상 성립합니다. 즉, 사용 중인 바이트 수는 0 이상 버퍼 크기 이하입니다. - INV-3 (데이터 가시성): consumer가
in의 새 값을 읽을 때, 해당 위치까지의 데이터가 이미 메모리에 기록되어 있습니다.
메모리 배리어(Memory Barrier) 배치
ARM, PowerPC 등 약한 메모리 순서(weak memory ordering) 아키텍처에서 정확성을 보장하기 위해 세 곳에 배리어가 배치됩니다.
Lock-free SPSC 정확성 증명 — 배리어 배치 근거
Producer 측 (kfifo_in):
STORE data[off..off+len]— 데이터를 버퍼에 기록합니다.smp_wmb()— StoreStore 배리어를 삽입합니다.STORE fifo->in += len— in 포인터를 갱신합니다.
Consumer 측 (kfifo_out):
LOAD fifo->in— 가용 데이터량을 계산합니다.smp_rmb()— LoadLoad 배리어를 삽입합니다.LOAD data[off..off+len]— 데이터를 읽습니다.smp_mb()— Full 배리어(LoadStore)를 삽입합니다.STORE fifo->out += len— out 포인터를 갱신합니다.
증명:
smp_wmb()에 의해 data STORE ≤_wo in STORE (쓰기 순서)가 보장됩니다.smp_rmb()에 의해 in LOAD ≤_ro data LOAD (읽기 순서)가 보장됩니다.- 결합: consumer가
in=N을 관측하면,data[0..N-1]이 모두 기록된 상태임이 보장됩니다 (INV-3). smp_mb()에 의해 data LOAD ≤ out STORE가 보장됩니다. 따라서 producer가out=M을 관측하면,data[0..M-1]이 이미 읽혀서 덮어써도 안전합니다.
ABA 문제가 발생하지 않는 이유: in과 out은 단조 증가하므로 같은 값으로 돌아오지 않습니다 (INV-1). 32비트 unsigned의 경우 4GB 래핑까지 단조 증가합니다. in == out이면 비어있고, in - out == size이면 가득 찬 상태입니다. 언더플로우(unsigned wrapping)에도 뺄셈은 정확합니다. 예를 들어 in=0x00000002, out=0xFFFFFFFE이면 in-out = 4로 정확합니다.
SPSC 정확성 반례(counterexample): 다중 producer에서의 실패
Thread A (producer 1): Thread B (producer 2): ────────────────────── ────────────────────── len_a = kfifo_unused(fifo); len_b = kfifo_unused(fifo); // 둘 다 unused=4로 봄 // 둘 다 unused=4로 봄 memcpy(data+off, buf_a, 3); memcpy(data+off, buf_b, 3); // 같은 위치에 덮어쓰기! // 데이터 손상 발생! smp_wmb(); smp_wmb(); fifo->in += 3; fifo->in += 3; // in이 6 증가 (3+3) // 실제 데이터는 3바이트만 유효
kfifo는 SPSC 전용입니다. 다중 producer/consumer가 필요하면 spinlock으로 감싸거나 kfifo_in_spinlocked()를 사용해야 합니다.
in과 out은 unsigned int이므로 32비트 범위(약 4GB)를 초과하면 자연스럽게 0으로 래핑됩니다. C 언어 표준에서 unsigned 정수의 오버플로우는 정의된 동작(well-defined)이며, in - out 연산은 래핑 후에도 정확한 결과를 반환합니다. 따라서 ABA 문제가 원천적으로 발생하지 않습니다.
x86 TSO와 ARM/PowerPC의 차이
x86 아키텍처는 Total Store Ordering(TSO)을 따르므로, 쓰기→쓰기 순서가 자연스럽게 보장됩니다. 따라서 smp_wmb()는 x86에서 컴파일러 배리어(barrier())로만 축소됩니다. 그러나 ARM과 PowerPC는 약한 메모리 순서를 사용하므로, 실제 하드웨어 배리어 명령어(dmb, lwsync)가 삽입됩니다.
| 배리어 함수 | x86 (TSO) | ARM (약한 순서) | PowerPC |
|---|---|---|---|
smp_wmb() | barrier() (컴파일러만) | dmb ishst | eieio |
smp_rmb() | barrier() (컴파일러만) | dmb ishld | lwsync |
smp_mb() | mfence 또는 lock addl | dmb ish | sync |
이 차이 때문에 x86에서만 테스트한 kfifo 커스텀 구현이 ARM 보드에서 데이터 손상을 일으키는 경우가 빈번합니다. 커널의 표준 kfifo API는 이러한 아키텍처 차이를 내부적으로 처리하므로, 직접 원형 버퍼를 구현하는 것보다 항상 안전합니다.
Linearizability 분석
kfifo의 SPSC 동작은 linearizable합니다. 각 kfifo_in()과 kfifo_out() 호출은 단일 원자적 지점(linearization point)에서 효력을 발휘하는 것으로 모델링할 수 있습니다.
- kfifo_in()의 linearization point:
fifo->in += len실행 시점입니다. 이 시점 이전에 데이터가 이미 기록되어 있으므로(smp_wmb()가 보장), consumer는 일관된 상태를 봅니다. - kfifo_out()의 linearization point:
fifo->out += len실행 시점입니다. 이 시점 이전에 데이터가 이미 읽혀 있으므로(smp_mb()가 보장), producer는 해당 공간이 안전하게 재사용 가능함을 알 수 있습니다.
이러한 성질 덕분에 kfifo는 형식 검증(formal verification) 도구로도 정확성이 입증된 몇 안 되는 커널 자료구조 중 하나입니다.
Lock-free가 가능한 전제 조건
kfifo의 lock-free 동작은 다음 네 가지 전제 조건이 모두 충족될 때만 보장됩니다. 하나라도 위반되면 별도의 동기화 메커니즘이 필요합니다.
- 단일 Writer:
in포인터를 변경하는 주체가 정확히 하나여야 합니다. 두 개 이상의 producer가 동시에kfifo_in()을 호출하면in포인터와 데이터의 원자성이 깨집니다. - 단일 Reader:
out포인터를 변경하는 주체가 정확히 하나여야 합니다. 두 개 이상의 consumer가 동시에kfifo_out()을 호출하면 같은 데이터를 중복 추출하거나 건너뛸 수 있습니다. - 적절한 메모리 배리어: 아키텍처의 메모리 모델에 맞는 배리어가 올바른 위치에 삽입되어 있어야 합니다. 커널 kfifo는 이를 내부적으로 처리합니다.
- 원자적 포인터 갱신:
in과out이unsigned int(32비트)이므로, 대부분의 아키텍처에서 단일 명령어로 갱신됩니다. 64비트 포인터를 사용하는 커스텀 구현에서는 32비트 시스템에서의 원자성을 별도로 확인해야 합니다.
실제 커널 코드에서의 SPSC 패턴 예시
커널에서 kfifo의 lock-free SPSC 패턴이 자연스럽게 형성되는 대표적인 시나리오는 다음과 같습니다.
- IRQ 핸들러(producer) → 커널 스레드(consumer): 하드웨어 인터럽트에서 데이터를 수집하고,
wake_up_interruptible()으로 깨운 커널 스레드가 데이터를 처리합니다. IRQ는 같은 CPU에서 직렬화되므로 producer가 자연스럽게 단일입니다. - softirq/tasklet(producer) → workqueue(consumer): 네트워크 수신 경로에서 패킷 메타데이터를 kfifo에 기록하고, 워크큐가 비동기적으로 처리합니다.
- 커널 스레드(producer) → 사용자 공간 read(consumer): 커널 내부 이벤트를 수집하여 kfifo에 쌓고, 사용자 공간 프로세스가 문자 디바이스를 통해 읽습니다.
이러한 시나리오에서는 아키텍처적으로 producer와 consumer가 하나씩 보장되므로, 추가 lock 없이 kfifo를 안전하게 사용할 수 있습니다.
kfifo와 다른 큐 구현 비교
리눅스 커널에는 다양한 큐(Queue) 구현이 있으며, 각각 고유한 설계 목표와 동시성 모델을 가집니다. 이 절에서는 kfifo를 중심으로 skb_queue, llist, workqueue, ring_buffer를 상세 비교합니다.
큐 선택 가이드라인 — 시나리오별 최적 선택
- IRQ 핸들러 → 커널 스레드 (바이트 스트림):
kfifo(SPSC lock-free, 최소 오버헤드). 예: UART 수신, ADC 샘플링, 센서 데이터 - 네트워크 패킷 큐잉:
skb_queue(sk_buff 전용, 내장 spinlock). 예: 소켓 수신 큐, TC qdisc, netfilter - 다중 CPU → 단일 소비자 (비동기 통지):
llist(lock-free MPSC, cmpxchg). 예: RCU 콜백, IRQ work, call_single_data - 프로세스 컨텍스트 비동기 작업:
workqueue(스레드 풀, 우선순위, 취소 지원). 예: 디스크 I/O 완료, 지연된 해제, 주기적 정리 - 고속 트레이싱 (per-CPU):
ring_buffer(페이지 기반, overwrite 가능). 예: ftrace, perf_event, BPF 링 버퍼
성능 비교 (단일 요소 enqueue/dequeue, x86-64):
| 구현 | 지연 시간 | 비고 |
|---|---|---|
| kfifo | ~5-15ns | 배리어만, 캐시 히트 시 |
| llist | ~20-40ns | cmpxchg 경쟁 시 증가 |
| skb_queue | ~30-60ns | spinlock 경쟁 시 증가 |
| ring_buffer | ~40-80ns | 페이지 관리 오버헤드 |
| workqueue | ~100-500ns | 스레드 깨우기 포함 |
kfifo, 가변 크기 패킷이면 skb_queue, 다중 producer 알림이면 llist, 지연 실행이 필요하면 workqueue, 고속 트레이싱이면 ring_buffer를 선택합니다.
메모리 할당 특성 비교
각 큐 구현의 메모리 할당 전략도 중요한 선택 기준입니다.
- kfifo: 초기화 시 단 한 번 연속 메모리를 할당합니다. 이후 삽입/추출에서 추가 할당이 발생하지 않으므로, IRQ 컨텍스트에서도 안전하게 사용할 수 있습니다. 단, 버퍼 크기를 변경하려면 해제 후 재할당해야 합니다.
- skb_queue: 각 패킷(sk_buff)은 개별적으로
kmalloc()/alloc_skb()로 할당됩니다. 큐 자체는 고정 크기(list_head + spinlock)이지만, 요소 수에 비례하여 메모리를 소비합니다. - llist: 노드를 호출자가 직접 할당하여 전달합니다. llist 자체는 포인터 하나(
llist_head)만 차지합니다. - ring_buffer: per-CPU 페이지 풀로 운영됩니다. 각 CPU마다 독립된 페이지 링을 가지며, 페이지 수는 사용자가
/sys/kernel/debug/tracing/buffer_size_kb로 조정합니다.
캐시 친화성(Cache Friendliness)
kfifo의 연속 메모리 레이아웃은 CPU 캐시 활용에 매우 유리합니다. 순차적으로 접근하는 패턴이므로 하드웨어 프리페처가 효과적으로 작동합니다. 반면, 리스트 기반 큐(skb_queue, llist)는 노드가 메모리 전역에 흩어져 있어 캐시 미스율이 높아질 수 있습니다. 특히 고속 I/O 경로에서 kfifo가 리스트 기반 큐보다 2~5배 빠른 성능을 보이는 주된 이유가 이 캐시 친화성 차이입니다.
그러나 kfifo는 고정 크기 제약이 있으므로, 버퍼가 가득 차면 데이터를 폐기하거나 생산자를 차단해야 합니다. 동적으로 크기가 변하는 워크로드에는 리스트 기반 큐가 더 적합합니다.
BPF 링 버퍼와의 비교
최근 커널(v5.8+)에 도입된 BPF 링 버퍼(bpf_ringbuf)는 kfifo와 유사한 원형 버퍼 설계를 사용하지만, 목적과 구현이 다릅니다. BPF 링 버퍼는 커널 BPF 프로그램에서 사용자 공간으로 이벤트를 효율적으로 전달하기 위해 설계되었으며, mmap()을 통해 사용자 공간과 커널이 같은 메모리를 공유합니다. 반면 kfifo는 커널 내부 전용으로, 사용자 공간 매핑은 직접 구현해야 합니다.
레코드 기반 kfifo 활용
기본 kfifo는 바이트 스트림을 다루지만, 레코드(record) 모드를 사용하면 가변 길이 메시지를 FIFO에 넣고 뺄 수 있습니다. 각 레코드 앞에 1바이트 또는 2바이트 길이 헤더가 자동으로 추가되어, kfifo_out() 시 정확한 크기의 레코드를 복원합니다.
DECLARE_KFIFO vs DECLARE_KFIFO_PTR
kfifo를 선언하는 두 가지 매크로의 차이를 이해하는 것이 중요합니다.
/*
* 레코드 기반 kfifo 선언과 사용 — recsize=2 예제
* 가변 길이 메시지 프로토콜 구현
*/
#include <linux/kfifo.h>
#include <linux/module.h>
/* 메시지 구조체: 가변 길이 페이로드 */
struct msg_header {
__u8 type; /* 메시지 타입 */
__u8 priority; /* 우선순위 */
__u16 payload_len;/* 페이로드 길이 */
};
/*
* DEFINE_KFIFO(name, type, size) — recsize=0 (고정 크기)
* struct KFIFO_REC_1(name) — recsize=1 (최대 255바이트)
* struct KFIFO_REC_2(name) — recsize=2 (최대 65535바이트)
*
* 내부적으로 recsize는 sizeof(type)이 1이면 자동 계산됩니다.
* 명시적으로 레코드 FIFO를 원하면 아래처럼 사용합니다.
*/
/* 레코드 FIFO: unsigned char 타입, 최대 레코드 65535바이트 */
static DECLARE_KFIFO(msg_fifo, unsigned char, 4096);
/* 메시지 전송: 헤더 + 페이로드를 하나의 레코드로 삽입 */
static int send_message(u8 type, u8 prio,
const void *payload, u16 len)
{
struct msg_header hdr = {
.type = type,
.priority = prio,
.payload_len = len,
};
unsigned char tmp[sizeof(hdr) + 1024];
unsigned int total = sizeof(hdr) + len;
if (total > sizeof(tmp))
return -EINVAL;
memcpy(tmp, &hdr, sizeof(hdr));
memcpy(tmp + sizeof(hdr), payload, len);
/* kfifo_in()은 레코드 모드에서 자동으로 길이 헤더를 추가합니다 */
if (kfifo_in(&msg_fifo, tmp, total) != total)
return -ENOSPC;
return 0;
}
/* 메시지 수신: kfifo_out으로 하나의 레코드를 추출 */
static int recv_message(struct msg_header *hdr,
void *payload, u16 max_len)
{
unsigned char tmp[1028];
unsigned int len;
/* kfifo_peek_len(): 다음 레코드의 길이를 확인 (추출하지 않음) */
len = kfifo_peek_len(&msg_fifo);
if (len == 0)
return -EAGAIN; /* FIFO 비어있음 */
if (len > sizeof(tmp))
return -EMSGSIZE;
/* 레코드 모드: 정확히 하나의 레코드를 추출합니다 */
if (kfifo_out(&msg_fifo, tmp, len) != len)
return -EIO;
memcpy(hdr, tmp, sizeof(*hdr));
if (hdr->payload_len > max_len)
return -EMSGSIZE;
memcpy(payload, tmp + sizeof(*hdr), hdr->payload_len);
return hdr->payload_len;
}
/*
* kfifo_peek_len() vs kfifo_len() — 레코드 모드 핵심 차이
*
* kfifo_len(): FIFO에 저장된 총 바이트 수를 반환합니다.
* (레코드 헤더 포함)
*
* kfifo_peek_len(): 다음 레코드의 데이터 길이만 반환합니다.
* (레코드 헤더 제외)
* FIFO가 비어있으면 0을 반환합니다.
*
* 사용 패턴:
*/
while (kfifo_peek_len(&fifo) > 0) {
unsigned int rec_len = kfifo_peek_len(&fifo);
unsigned char *buf = kmalloc(rec_len, GFP_KERNEL);
if (!buf)
break;
/* 정확히 rec_len 바이트의 레코드 하나를 추출합니다 */
kfifo_out(&fifo, buf, rec_len);
process_record(buf, rec_len);
kfree(buf);
}
/*
* 주의: 레코드 모드에서 kfifo_out()에 전달하는 len이
* 실제 레코드 크기보다 작으면 레코드 전체가 폐기됩니다.
* 항상 kfifo_peek_len()으로 확인 후 충분한 버퍼를 할당해야 합니다.
*/
DECLARE_KFIFO로 선언할 때 타입이 unsigned char이고 크기가 256 이하이면 자동으로 recsize=1이 적용될 수 있습니다. 명시적으로 레코드 크기를 제어하려면 STRUCT_KFIFO_REC_1 또는 STRUCT_KFIFO_REC_2 매크로를 사용합니다.
레코드 FIFO의 내부 동작 원리
레코드 모드에서 kfifo_in()을 호출하면, 내부적으로 데이터 앞에 길이 헤더를 삽입합니다. 이 헤더의 크기는 recsize에 의해 결정됩니다.
recsize=1: 1바이트 헤더, 최대 레코드 크기 255바이트입니다. 소형 센서 데이터, 제어 메시지에 적합합니다.recsize=2: 2바이트 헤더(리틀 엔디안), 최대 레코드 크기 65,535바이트입니다. 로그 메시지, 네트워크 패킷 버퍼링에 적합합니다.
kfifo_out()은 헤더에서 길이를 읽어 정확한 크기의 레코드를 추출합니다. 이때 전달된 버퍼 크기가 레코드보다 작으면, 레코드 전체가 폐기(discard)됩니다. 이는 부분 읽기로 인한 상태 불일치를 방지하기 위한 설계입니다.
실전 활용: IPC 메시지 프로토콜
레코드 기반 kfifo는 커널 내부의 간단한 IPC(Inter-Process Communication) 메시지 프로토콜을 구현할 때 유용합니다. 예를 들어, 드라이버와 사용자 공간 데몬 사이에 가변 길이 이벤트를 전달하는 시나리오에서 다음과 같은 패턴을 사용할 수 있습니다.
- 이벤트 생성(Producer): IRQ 핸들러에서 이벤트 헤더와 페이로드를 조합하여
kfifo_in()으로 삽입합니다. - 이벤트 소비(Consumer): 사용자 공간에서
read()시스템 콜을 통해kfifo_to_user()로 레코드를 전달합니다. - 배압(Backpressure): FIFO가 가득 차면
kfifo_in()이 0을 반환하므로, 오래된 이벤트를 폐기하거나wait_event_interruptible()로 대기할 수 있습니다.
kfifo_peek_len()을 사용하면 추출 전에 다음 레코드의 크기를 확인할 수 있으므로, 사용자 공간에서 적절한 크기의 버퍼를 할당한 후 읽기 작업을 수행할 수 있습니다. 이는 ioctl()로 레코드 크기를 먼저 조회하고, 이후 read()로 데이터를 가져오는 2단계 패턴으로 구현됩니다.
kfifo 관련 커널 버그 사례와 교훈
kfifo는 단순한 API를 제공하지만, 잘못된 사용 패턴이 실제 커널 버그로 이어진 사례가 있습니다. 이 절에서는 대표적인 버그 패턴을 분류하고, 각각의 원인과 방지 방법을 설명합니다.
/*
* 버그 사례 모음: kfifo 사용 시 주의해야 할 패턴들
*/
/* ── 사례 1: Non-power-of-2 크기 (v2.6 이전) ──
*
* commit 2e956fb ("kfifo: fix non-power-of-2 size handling")
* 이전에는 2의 거듭제곱이 아닌 크기를 전달하면
* mask 연산이 잘못되어 버퍼 범위를 벗어났습니다.
*
* 잘못된 코드:
* kfifo_alloc(&fifo, 100, GFP_KERNEL);
* // mask = 100 - 1 = 99 = 0x63
* // in & mask ≠ in % 100 → 인덱스 오류
*
* 수정된 코드 (현재):
* kfifo_alloc() 내부에서 roundup_pow_of_two(size) 적용
* 100 → 128로 올림, mask = 127 = 0x7F
*/
/* ── 사례 2: 다중 producer race condition ──
*
* 증상: 고부하 환경에서 간헐적 데이터 손상
* 원인: 두 IRQ 핸들러가 같은 kfifo에 동시 삽입
*
* 잘못된 코드: */
static DEFINE_KFIFO(shared_fifo, u8, 1024);
/* IRQ handler A */
static irqreturn_t handler_a(int irq, void *dev)
{
u8 data = read_sensor_a();
kfifo_in(&shared_fifo, &data, 1); /* BUG: lock 없음! */
return IRQ_HANDLED;
}
/* 올바른 코드: */
static DEFINE_SPINLOCK(fifo_lock);
static irqreturn_t handler_a_fixed(int irq, void *dev)
{
u8 data = read_sensor_a();
kfifo_in_spinlocked(&shared_fifo, &data, 1, &fifo_lock);
return IRQ_HANDLED;
}
/* ── 사례 3: esize 불일치 ──
*
* DECLARE_KFIFO(fifo, struct small_msg, 32);
* // sizeof(struct small_msg) = 8
*
* struct big_msg msg; // sizeof = 16
* kfifo_in(&fifo, &msg, 1);
* // 컴파일러 경고 발생 (타입 불일치)
* // 런타임: 8바이트만 복사 → 데이터 잘림
*
* 해결: 타입 안전 매크로 경고를 무시하지 않습니다.
* -Werror와 함께 빌드하면 컴파일 시 차단됩니다.
*/
/* ── 사례 4: 해제 순서 오류 ──
*
* 증상: rmmod 시 간헐적 oops
* 원인: worker 스레드가 kfifo_out() 중에 모듈 해제
*/
static void buggy_cleanup(void)
{
kfifo_free(&my_fifo); /* BUG: worker가 아직 사용 중! */
cancel_work_sync(&my_work);
}
static void correct_cleanup(void)
{
cancel_work_sync(&my_work); /* 먼저 worker 종료 대기 */
kfifo_free(&my_fifo); /* 그 다음 안전하게 해제 */
}
- kfifo 크기는 항상 2의 거듭제곱으로 지정합니다 (최신 커널이 자동 올림하더라도 명시적이 좋습니다).
- SPSC 외 시나리오에서는 반드시
kfifo_in_spinlocked()/kfifo_out_spinlocked()을 사용합니다. - 선언 타입과 사용 타입이 일치하는지 확인합니다.
-Werror빌드를 적용합니다. - 모듈 해제 시 모든 접근자(worker, timer, tasklet)를 먼저 정지합니다.
- 커스텀 원형 버퍼를 구현하지 말고, 검증된
kfifoAPI를 사용합니다.
디버깅 도구 활용
kfifo 관련 버그를 추적할 때 활용할 수 있는 커널 디버깅 도구들을 소개합니다.
- KASAN (Kernel Address Sanitizer):
CONFIG_KASAN=y로 빌드하면 Use-After-Free, 버퍼 오버플로우를 런타임에 감지합니다. kfifo 버퍼 범위를 벗어나는 접근을 즉시 보고합니다. - KCSAN (Kernel Concurrency Sanitizer):
CONFIG_KCSAN=y로 빌드하면 데이터 레이스를 감지합니다. 다중 producer가 동시에 kfifo에 접근하는 경쟁 조건을 잡아냅니다. - lockdep:
CONFIG_PROVE_LOCKING=y로 빌드하면kfifo_in_spinlocked()에서 사용하는 lock의 순서 위반을 감지합니다. - ftrace:
trace_printk()를 kfifo 연산 전후에 삽입하여 in/out 포인터 값의 변화를 추적할 수 있습니다.
버전별 API 변경 이력
kfifo API는 커널 버전에 따라 상당한 변화를 겪었습니다. 드라이버를 다른 커널 버전에 포팅할 때 주의해야 합니다.
| 커널 버전 | 변경 사항 |
|---|---|
v2.6.10 | 최초 kfifo 도입 (Stelian Pop). kfifo_alloc()/kfifo_put()/kfifo_get() |
v2.6.33 | Stefani Seibold의 대규모 재설계. 타입 안전 매크로(DECLARE_KFIFO, DEFINE_KFIFO) 도입 |
v2.6.36 | 레코드 모드(STRUCT_KFIFO_REC_1/2) 추가, kfifo_to_user()/kfifo_from_user() 추가 |
v3.x | Non-power-of-2 크기 자동 올림(roundup_pow_of_two()) 적용 |
v5.x+ | kfifo_in_spinlocked()/kfifo_out_spinlocked() 안정화, DMA 관련 유틸리티 강화 |
특히 v2.6.33 이전의 구형 API(kfifo_put()/kfifo_get())를 사용하는 코드는 최신 커널에서 컴파일되지 않으므로, 마이그레이션 시 새 매크로 기반 API로 전환해야 합니다.
코드 리뷰 시 확인 포인트
kfifo를 사용하는 코드를 리뷰할 때 다음 항목을 체계적으로 확인하면 대부분의 버그를 사전에 방지할 수 있습니다.
- 생산자/소비자 수: kfifo에 접근하는 코드 경로를 추적하여, 실제로 SPSC가 보장되는지 확인합니다. 특히 softirq와 IRQ가 동시에 producer 역할을 하지 않는지 점검합니다.
- 크기 적정성: 최악의 경우(burst traffic) 생산 속도 대비 소비 속도를 계산하여, 오버플로우가 발생하지 않는 버퍼 크기를 설정합니다. 일반적으로 평균 사용량의 4~8배를 권장합니다.
- 에러 처리:
kfifo_in()의 반환값(실제 삽입 바이트 수)을 확인하여, 버퍼 풀 상황을 적절히 처리하는지 검증합니다. - 수명 관리: kfifo가 모듈 또는 디바이스의 수명과 일치하는지, 모든 접근자가 정지된 후에만 해제되는지 확인합니다.
- 타입 일관성:
DECLARE_KFIFO에서 선언한 타입과kfifo_in()/kfifo_out()에서 사용하는 타입이 일치하는지 확인합니다. - 레코드 모드 호환성: 레코드 FIFO를 사용하는 경우,
kfifo_peek_len()으로 크기를 확인한 후 적절한 버퍼를 할당하는지 검증합니다.
kfifo 헬퍼 함수 내부 구현 분석
kfifo_copy_in()은 이미 앞 절에서 다루었습니다. 이 절에서는 대칭 함수인 kfifo_copy_out()과,
크기 계산의 핵심인 kfifo_unused(), kfifo_len() 헬퍼 매크로의 실제 구현을 분석합니다.
이 함수들은 사용자에게 직접 노출되지 않지만, 모든 kfifo 연산의 기반이 되는 내부 로직입니다.
kfifo_copy_out: 분할 읽기 로직
/* lib/kfifo.c — kfifo_copy_out 내부 */
static void kfifo_copy_out(struct __kfifo *fifo, void *dst,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask;
if (esize != 0) {
off *= esize;
size *= esize;
len *= esize;
}
/* kfifo_copy_in과 동일한 분할 패턴 */
l = min(len, size - off);
memcpy(dst, fifo->data + off, l); /* 1차: off부터 끝까지 */
memcpy(dst + l, fifo->data, len - l); /* 2차: 처음부터 나머지 */
}
코드 설명
kfifo_copy_out()은 kfifo_copy_in()의 완전 대칭 함수입니다. 소스와 목적지의 방향만 반대입니다.
- off &= fifo->maskout 포인터의 절대값을 버퍼 내 오프셋으로 변환합니다.
kfifo_copy_in에서 in에 대해 수행하는 것과 동일합니다. - esize 곱셈구조체 배열 모드에서는 요소 개수 기반 인덱스를 바이트 기반으로 변환합니다. esize=0(바이트 스트림)이면 변환하지 않습니다.
- memcpy 방향
copy_in은src → fifo->data이고,copy_out은fifo->data → dst입니다. 분할 로직은 동일합니다. - len - l == 0래핑이 없으면 두 번째 memcpy는 길이 0으로 호출되어 no-op가 됩니다. 분기 없는 일관된 코드 경로입니다.
kfifo_unused / kfifo_len: unsigned 뺄셈의 핵심
/* include/linux/kfifo.h — 크기 계산 헬퍼 */
/*
* kfifo_len — 현재 저장된 요소 수를 반환합니다.
*
* in과 out이 모두 unsigned int이므로, in이 오버플로되어
* out보다 작은 값을 가져도 뺄셈 결과는 정확합니다.
*
* 예: in = 0x00000003, out = 0xFFFFFFFE
* in - out = 0x00000005 (5개 요소)
*/
#define kfifo_len(fifo) \
({ \
typeof((fifo) + 1) __tmp = (fifo); \
__tmp->kfifo.in - __tmp->kfifo.out; \
})
/*
* kfifo_unused — 남은 빈 슬롯 수를 반환합니다.
*
* (mask + 1)은 버퍼 전체 크기입니다.
* kfifo_len()을 빼면 사용 가능한 공간이 됩니다.
*/
static inline unsigned int kfifo_unused(struct __kfifo *fifo)
{
return (fifo->mask + 1) - (fifo->in - fifo->out);
}
/*
* kfifo_is_empty / kfifo_is_full — 상태 판별
*
* empty: in == out → 데이터 없음
* full: in - out == size → 빈 공간 없음
*
* unsigned 래핑 후에도 이 조건은 정확합니다.
*/
#define kfifo_is_empty(fifo) \
((fifo)->kfifo.in == (fifo)->kfifo.out)
#define kfifo_is_full(fifo) \
(kfifo_len(fifo) > (fifo)->kfifo.mask)
코드 설명
이 헬퍼들은 unsigned 산술의 속성을 활용하여 조건 분기 없이 상태를 계산합니다.
- in - outC 표준에서 unsigned 정수의 뺄셈은 모듈러 연산(modulo 232)으로 정의됩니다. 따라서 in이 오버플로하여 out보다 작아져도, 차이는 항상 올바른 값입니다.
- mask + 1mask는
size - 1이므로,mask + 1은 버퍼 전체 크기를 복원합니다. 별도의 size 필드를 저장하지 않아 구조체 크기를 절약합니다. - typeof((fifo) + 1)매크로에서 타입 안전성을 확보하기 위한 기법입니다.
+ 1은 배열 decay를 유도하여 포인터 타입을 추론합니다. - kfifo_is_full
len > mask는len == mask + 1(즉len == size)과 동일합니다. 등호 대신 부등호를 사용하여 방어적으로 작성되었습니다.
kfifo_to_user / kfifo_from_user: 사용자 공간 복사
/* include/linux/kfifo.h — 사용자 공간 복사 헬퍼 */
/*
* kfifo_to_user — kfifo에서 사용자 공간으로 데이터 복사
*
* copy_to_user()를 사용하므로, 페이지 폴트가 발생할 수 있습니다.
* 따라서 atomic 컨텍스트(IRQ, softirq)에서는 사용할 수 없습니다.
*
* 내부적으로 kfifo_copy_out()과 동일한 분할 패턴을 사용하되,
* memcpy 대신 copy_to_user를 호출합니다.
*/
static int kfifo_copy_to_user(struct __kfifo *fifo,
void __user *to, unsigned int len,
unsigned int off, unsigned int *copied)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
int ret;
off &= fifo->mask;
if (esize != 0) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off);
/* 1차: off부터 버퍼 끝까지 → 사용자 공간 */
ret = copy_to_user(to, fifo->data + off, l);
if (unlikely(ret)) {
*copied = l - ret;
return -EFAULT;
}
/* 2차: 버퍼 시작부터 → 사용자 공간 */
ret = copy_to_user(to + l, fifo->data, len - l);
if (unlikely(ret)) {
*copied = len - ret;
return -EFAULT;
}
*copied = len;
return 0;
}
코드 설명
사용자 공간 복사는 memcpy() 대신 copy_to_user()를 사용하는 점만 다릅니다.
- copy_to_user커널 → 사용자 공간 복사 시 접근 권한 검사와 페이지 폴트 처리를 수행합니다. 실패 시 복사하지 못한 바이트 수를 반환합니다.
- unlikely(ret)정상적인 사용자 포인터라면 실패할 확률이 낮으므로, 분기 예측 힌트로 성능을 최적화합니다.
- *copied부분 복사가 발생하면 실제 복사된 바이트 수를 기록합니다. 호출자는 이 값으로 out 포인터를 적절히 전진시킵니다.
- atomic 컨텍스트 금지
copy_to_user()는 슬립할 수 있으므로, IRQ 핸들러에서는kfifo_out()으로 커널 버퍼에 먼저 복사한 뒤 프로세스 컨텍스트에서 사용자에게 전달해야 합니다.
kfifo 원형 버퍼 링(Ring) 시각화
지금까지 kfifo를 선형 배열로 시각화했습니다. 이 절에서는 실제 동작과 더 가까운 원형(ring) 형태로 버퍼 상태를 시각화합니다. 원형으로 보면 wrap-around가 자연스러운 연속 동작임을 직관적으로 이해할 수 있습니다.
in & mask == out & mask인 경우가 두 가지 있습니다.
하나는 empty(in == out)이고, 다른 하나는 full(in - out == size)입니다.
kfifo는 in과 out을 wrap하지 않는 절대값으로 유지하기 때문에, 이 두 상태를 in == out인지 in - out == size인지로 정확히 구별할 수 있습니다.
이것이 "한 슬롯을 비워두는" 전통적 원형 버퍼 구현보다 공간 효율적인 이유입니다.
unsigned 오버플로 인덱스 래핑 시각화
kfifo의 가장 영리한 설계 중 하나는 in과 out을 mod 연산 없이 자유롭게 증가시키고,
32비트 unsigned 오버플로가 자연스럽게 처리되도록 한 것입니다.
아래 다이어그램은 32비트 경계(0xFFFFFFFF)를 넘어갈 때 인덱스가 어떻게 동작하는지 보여줍니다.
이 설계의 수학적 근거는 다음과 같습니다. unsigned int의 연산은 모듈러 산술(modular arithmetic, mod 232)입니다.
두 값 a와 b에 대해 a - b (mod 232)는 실제 차이가 232 미만인 한 항상 정확합니다.
kfifo의 버퍼 크기는 최대 수 MB이므로 232(약 4GB)에 비해 매우 작아, 이 조건이 항상 성립합니다.
Producer-Consumer 상태 전이 다이어그램
kfifo의 SPSC(단일 생산자/단일 소비자) 프로토콜을 상태 머신으로 모델링하면, lock-free 동작의 정확성을 더 명확히 이해할 수 있습니다. 아래 다이어그램은 producer와 consumer의 각 단계에서 수행하는 연산과 메모리 배리어의 위치를 보여줍니다.
이 상태 전이 다이어그램의 핵심은 세 가지 배리어가 각각 하나의 순서 보장을 담당하는 것입니다.
- smp_wmb() (Producer) — 데이터 STORE가 in STORE보다 먼저 보이도록 합니다. Consumer가 새 in 값을 읽으면 데이터가 이미 준비되어 있습니다.
- smp_rmb() (Consumer) — in LOAD가 데이터 LOAD보다 먼저 수행되도록 합니다. in 값으로 계산한 범위만큼만 안전하게 읽습니다.
- smp_mb() (Consumer) — 데이터 LOAD가 out STORE보다 먼저 완료되도록 합니다. Producer가 새 out 값을 보면 데이터가 이미 읽혀서 덮어써도 안전합니다.
구현 예제: 고속 센서 집계 드라이버
kfifo를 활용한 실전 디바이스 드라이버 패턴을 소개합니다. 이 예제는 고속 센서(예: 가속도계, 자이로스코프)에서 IRQ로 데이터를 수집하고, 사용자 공간 애플리케이션이 배치(batch) 단위로 읽는 패턴입니다. SPSC 구조가 자연스럽게 형성되어 lock-free kfifo를 최적으로 활용합니다.
/*
* sensor_kfifo.c — 고속 센서 데이터 집계 드라이버 (kfifo 활용)
*
* 구조: IRQ handler (producer) → read() (consumer)
* 동기화: SPSC lock-free (spinlock 불필요)
*/
#include <linux/module.h>
#include <linux/kfifo.h>
#include <linux/interrupt.h>
#include <linux/miscdevice.h>
#include <linux/wait.h>
#include <linux/poll.h>
struct sensor_sample {
u64 timestamp; /* ktime_get_ns() 타임스탬프 */
s16 x, y, z; /* 3축 가속도 값 */
u16 status; /* 상태 플래그 */
}; /* sizeof = 16 바이트 */
struct sensor_dev {
struct miscdevice misc;
DECLARE_KFIFO(fifo, struct sensor_sample, 256);
wait_queue_head_t wq;
int irq;
void __iomem *base;
atomic_t overrun_count;
};
/*
* IRQ 핸들러: producer
*
* 하드웨어 IRQ는 같은 IRQ 라인에서 직렬화되므로
* 이 핸들러는 자연스럽게 단일 producer입니다.
*/
static irqreturn_t sensor_irq_handler(int irq, void *dev_id)
{
struct sensor_dev *sdev = dev_id;
struct sensor_sample sample;
/* 하드웨어 레지스터에서 센서 데이터 읽기 */
sample.timestamp = ktime_get_ns();
sample.x = ioread16(sdev->base + 0x00);
sample.y = ioread16(sdev->base + 0x02);
sample.z = ioread16(sdev->base + 0x04);
sample.status = ioread16(sdev->base + 0x06);
/*
* kfifo_in(): lock-free 삽입
* 반환값 = 실제 삽입된 요소 수 (0이면 가득 참)
*/
if (kfifo_in(&sdev->fifo, &sample, 1) == 0)
atomic_inc(&sdev->overrun_count);
/* 대기 중인 reader를 깨움 */
wake_up_interruptible(&sdev->wq);
return IRQ_HANDLED;
}
/*
* read(): consumer
*
* 사용자 공간 프로세스가 호출하므로 자연스럽게 단일 consumer입니다.
* (파일 디스크립터당 하나의 프로세스만 read() 진행)
*/
static ssize_t sensor_read(struct file *file,
char __user *buf, size_t count, loff_t *ppos)
{
struct sensor_dev *sdev = file->private_data;
unsigned int copied;
int ret;
/* count를 sample 크기의 정수배로 정렬 */
count = (count / sizeof(struct sensor_sample))
* sizeof(struct sensor_sample);
if (count == 0)
return -EINVAL;
/* 비블로킹 모드 */
if (file->f_flags & O_NONBLOCK) {
if (kfifo_is_empty(&sdev->fifo))
return -EAGAIN;
} else {
/* 블로킹 모드: 데이터가 올 때까지 대기 */
ret = wait_event_interruptible(sdev->wq,
!kfifo_is_empty(&sdev->fifo));
if (ret)
return ret;
}
/*
* kfifo_to_user(): lock-free로 커널 → 사용자 공간 복사
* 내부적으로 copy_to_user()와 분할 복사를 수행합니다.
*/
ret = kfifo_to_user(&sdev->fifo, buf, count, &copied);
if (ret)
return ret;
return copied;
}
/* poll(): 데이터 가용성 통지 */
static __poll_t sensor_poll(struct file *file,
struct poll_table_struct *wait)
{
struct sensor_dev *sdev = file->private_data;
poll_wait(file, &sdev->wq, wait);
if (!kfifo_is_empty(&sdev->fifo))
return EPOLLIN | EPOLLRDNORM;
return 0;
}
코드 설명
이 드라이버는 IRQ → read() SPSC 패턴의 교과서적 구현입니다.
- DECLARE_KFIFO(fifo, struct sensor_sample, 256)256개의 센서 샘플(256 × 16B = 4KB)을 저장하는 정적 kfifo를 구조체에 내장합니다. L1 캐시에 들어가는 최적 크기입니다.
- atomic_t overrun_countkfifo가 가득 차서 데이터를 잃을 때마다 카운터를 증가시킵니다. sysfs를 통해 모니터링할 수 있습니다.
- IRQ → wake_up_interruptible데이터 삽입 후 즉시 reader를 깨웁니다. kfifo_in()이 smp_wmb()를 내부적으로 호출하므로, wakeup 시점에 데이터가 이미 가시적입니다.
- count 정렬사용자가 요청한 바이트 수를 샘플 크기의 정수배로 내림합니다. 부분 샘플 전달을 방지합니다.
- kfifo_to_user()kfifo_out() + copy_to_user()를 한 번에 수행합니다. 중간 커널 버퍼가 불필요하여 메모리 복사를 절반으로 줄입니다.
구현 예제: DMA 링 버퍼 수집 패턴
고속 데이터 수집 장치(ADC, 네트워크 어댑터 등)에서는 DMA가 하드웨어 버퍼에서 메모리로 데이터를 직접 전송합니다. kfifo의 원형 버퍼 구조를 DMA 전송과 결합하면, CPU 오버헤드를 최소화하면서 연속적인 데이터 스트리밍을 구현할 수 있습니다. 핵심은 kfifo 버퍼를 DMA 목적지(destination)로 직접 사용하는 것입니다.
/*
* dma_ring.c — DMA 링 버퍼 수집 패턴
*
* DMA 엔진이 하드웨어 → kfifo 버퍼로 직접 전송합니다.
* kfifo의 scatter/gather API를 활용하여 래핑 경계를
* DMA 엔진에게 투명하게 처리합니다.
*/
#include <linux/kfifo.h>
#include <linux/dma-mapping.h>
#include <linux/scatterlist.h>
struct dma_collector {
struct __kfifo fifo;
void *buf; /* DMA 가능 버퍼 */
dma_addr_t dma_handle; /* DMA 물리 주소 */
struct device *dev;
size_t buf_size; /* 2의 거듭제곱 */
};
/*
* 초기화: DMA 일관성 버퍼 할당 + kfifo 연결
*
* dma_alloc_coherent()로 할당하면 CPU와 디바이스 모두
* 캐시 일관성이 보장됩니다 (snoop 또는 uncached).
*/
static int dma_collector_init(struct dma_collector *dc,
struct device *dev, size_t size)
{
/* size를 2의 거듭제곱으로 올림 */
size = roundup_pow_of_two(size);
dc->buf = dma_alloc_coherent(dev, size,
&dc->dma_handle, GFP_KERNEL);
if (!dc->buf)
return -ENOMEM;
dc->dev = dev;
dc->buf_size = size;
/*
* kfifo를 외부 버퍼로 초기화합니다.
* kfifo_init()은 기존 버퍼를 kfifo로 래핑합니다.
* 이 버퍼는 DMA 가능 주소이므로 하드웨어가 직접 쓸 수 있습니다.
*/
return kfifo_init(&dc->fifo, dc->buf, size);
}
/*
* DMA 전송 준비: scatter/gather 리스트 생성
*
* kfifo의 빈 영역을 1~2개의 DMA 세그먼트로 변환합니다.
* 래핑이 발생하면 두 개의 세그먼트가 됩니다.
*/
static int dma_prepare_sg(struct dma_collector *dc,
struct scatterlist *sg, unsigned int *nents,
size_t max_len)
{
unsigned int off, size, l;
unsigned int unused;
unused = dc->buf_size - (dc->fifo.in - dc->fifo.out);
if (unused == 0)
return -ENOSPC;
max_len = min_t(size_t, max_len, unused);
off = dc->fifo.in & dc->fifo.mask;
size = dc->buf_size;
/* 끝까지의 연속 공간 */
l = min_t(size_t, max_len, size - off);
sg_init_table(sg, 2);
/* 세그먼트 1: off부터 버퍼 끝 (또는 데이터 끝) */
sg_dma_address(&sg[0]) = dc->dma_handle + off;
sg_dma_len(&sg[0]) = l;
if (max_len - l > 0) {
/* 세그먼트 2: 버퍼 시작부터 나머지 (래핑) */
sg_dma_address(&sg[1]) = dc->dma_handle;
sg_dma_len(&sg[1]) = max_len - l;
*nents = 2;
} else {
sg_mark_end(&sg[0]);
*nents = 1;
}
return 0;
}
/*
* DMA 완료 콜백: in 포인터 전진
*
* DMA 엔진이 데이터 전송을 완료하면 호출됩니다.
* DMA coherent 버퍼이므로 별도의 cache invalidate가 불필요합니다.
*/
static void dma_complete_callback(void *data)
{
struct dma_collector *dc = data;
size_t transferred = dc->last_transfer_len;
/*
* smp_wmb()는 DMA coherent 버퍼에서는 불필요합니다.
* DMA 엔진이 메모리에 직접 기록하고,
* coherent 할당이 캐시 일관성을 보장하기 때문입니다.
*
* 그러나 kfifo_in()을 사용하지 않고 직접 in을 갱신하므로,
* 컴파일러 배리어는 필요합니다.
*/
barrier();
dc->fifo.in += transferred;
}
/*
* 해제: DMA 버퍼 반환
*/
static void dma_collector_destroy(struct dma_collector *dc)
{
dma_free_coherent(dc->dev, dc->buf_size,
dc->buf, dc->dma_handle);
}
코드 설명
DMA와 kfifo를 결합하는 핵심은 kfifo 버퍼를 DMA 타겟으로 재사용하는 것입니다.
- dma_alloc_coherentCPU와 디바이스 간 캐시 일관성이 보장되는 메모리를 할당합니다. DMA 전송 후 별도의 캐시 무효화가 불필요합니다.
- kfifo_init외부에서 할당된 버퍼를 kfifo로 래핑합니다.
kfifo_alloc()과 달리 이미 할당된 DMA 버퍼를 재활용할 수 있습니다. - sg 분할kfifo의 빈 영역이 버퍼 끝을 넘어가면(래핑), 두 개의 scatter/gather 세그먼트로 분할합니다. DMA 엔진은 이 두 세그먼트를 연속으로 전송합니다.
- barrier()DMA coherent 메모리에서는 하드웨어 메모리 배리어가 불필요하지만, 컴파일러 재배치 방지를 위해
barrier()를 사용합니다. - fifo.in += transferredDMA 완료 후 직접 in을 갱신합니다.
kfifo_in()을 호출하지 않는 이유는 데이터가 이미 DMA에 의해 버퍼에 기록되었기 때문입니다.
dma_alloc_coherent() 대신 dma_map_single()(streaming DMA)을 사용하는 경우, DMA 전송 후 반드시 dma_sync_single_for_cpu()를 호출하여 캐시를 무효화해야 합니다.
이 동기화를 생략하면 CPU가 DMA 이전의 캐시된 데이터를 읽어 데이터 손상이 발생합니다.
성능이 중요한 경우 streaming DMA가 더 효율적이지만, 정확한 동기화 관리가 필수입니다.
관련 문서
kfifo와 관련된 다른 주제를 더 깊이 이해하고 싶다면 다음 문서를 참고하세요.
참고 자료
- include/linux/kfifo.h — Bootlin Elixir — kfifo 매크로, 인라인 함수, 타입 안전 래퍼 헤더 소스 코드입니다
- lib/kfifo.c — Bootlin Elixir — kfifo 핵심 구현 함수(할당, 복사, 해제) 소스 코드입니다
- samples/kfifo/ — Bootlin Elixir — 커널 내장 kfifo 예제 코드(bytestream, inttype, record)를 확인할 수 있습니다
- Kernel API — kfifo interface — 공식 커널 문서에서 kfifo API 레퍼런스를 제공합니다
- A lockless ring-buffer (LWN.net, 2009) — kfifo의 lock-free 설계 원리와 메모리 배리어 활용을 설명하는 기사입니다
- The kfifo rewrite (LWN.net, 2009) — Stefani Seibold의 kfifo API 재설계(타입 안전 매크로 도입)를 다룬 기사입니다
- kfifo, pair of pointers (LWN.net, 2009) — kfifo의 in/out 포인터 기반 순환 버퍼 구조를 분석한 기사입니다
- DECLARE_KFIFO 매크로 — Bootlin Elixir — 정적 kfifo 선언 매크로의 구현을 확인할 수 있습니다
- Circular Buffers — Kernel Docs — 리눅스 커널에서의 순환 버퍼 사용 가이드라인과 메모리 배리어 규칙을 설명합니다