kfifo (Circular Buffer)

Linux 커널의 범용 FIFO 큐 구현인 kfifo(원형 버퍼)의 설계 원리, lock-free 단일 생산자/소비자 패턴, API 사용법, 디바이스 드라이버와 로그 버퍼 활용 사례, 성능 최적화 기법을 종합적으로 다룹니다.

필수 배경 지식: C 언어 포인터, 비트 연산, 모듈로 연산(%)을 이해하고 있어야 합니다.
일상 비유: kfifo는 회전 초밥 컨베이어 벨트와 비슷합니다. 초밥을 놓는 사람(producer)과 가져가는 사람(consumer)이 독립적으로 동작하며, 벨트가 원형이므로 끝에 도달하면 다시 처음으로 돌아옵니다. 버퍼가 가득 차면 생산자가 대기하고, 비어있으면 소비자가 대기합니다.

핵심 요약

  • FIFO 순서 — First In First Out, 먼저 들어간 데이터가 먼저 나옵니다.
  • 고정 크기 — 생성 시 크기가 결정되며, 반드시 2의 거듭제곱이어야 합니다.
  • Lock-free — 단일 생산자/단일 소비자 구성에서 lock 없이 동작합니다.
  • Wrap-around — 버퍼 끝에 도달하면 자동으로 처음으로 돌아갑니다.
  • 빠른 연산 — in/out 인덱스를 모듈로 연산 대신 비트 마스킹으로 처리합니다.

단계별 이해

  1. 원형 버퍼 개념
    선형 배열을 원형으로 사용하는 원리를 이해합니다.
  2. 인덱스 관리
    in/out 포인터가 어떻게 순환하는지 확인합니다.
  3. Lock-free 조건
    단일 생산자/소비자에서 왜 lock이 필요 없는지 학습합니다.
  4. API 사용
    kfifo_alloc, kfifo_in, kfifo_out 함수를 익힙니다.
실사용 사례: 디바이스 드라이버의 데이터 버퍼링, 커널 로그 버퍼, 네트워크 패킷 큐, 문자 디바이스 입출력 버퍼, 비동기 I/O 큐 등 거의 모든 곳에서 사용됩니다.

개요 (Overview)

kfifo(kernel FIFO)는 커널이 제공하는 범용 원형 버퍼(circular buffer) 구현입니다. <linux/kfifo.h>에 정의되어 있으며, 고정 크기 FIFO 큐를 효율적으로 관리합니다.

kfifo가 널리 사용되는 이유:

⚠️

Lock-free 제약: kfifo의 lock-free 특성은 단일 생산자와 단일 소비자일 때만 보장됩니다. 다중 생산자(MPSC) 또는 다중 소비자(SPMC)가 필요하면 명시적인 동기화(spinlock)가 필요합니다.

원형 버퍼 원리 (Circular Buffer Concept)

원형 버퍼는 선형 배열을 원형처럼 사용하는 자료구조입니다. 두 개의 인덱스(in, out)를 유지합니다:

원형 버퍼 구조 [0] [1] [2] A [3] B [4] C [5] [6] [7] out (read) in (write) 버퍼 상태 사용 중: in - out = 3개 빈 공간: size - (in - out) = 5개 Wrap: in이나 out이 size에 도달하면 0으로
원형 버퍼: out(읽기 위치)와 in(쓰기 위치) 사이에 유효 데이터 존재

인덱스 관리 (Index Management)

kfifo의 핵심은 인덱스를 어떻게 효율적으로 관리하는가입니다:

/* 전통적인 원형 버퍼 (느림) */
unsigned int in = 0, out = 0;
#define SIZE 8

/* 쓰기 */
buffer[in % SIZE] = data;
in++;

/* 읽기 */
data = buffer[out % SIZE];
out++;

/* 문제: 모듈로(%) 연산은 느림! */
/* kfifo 방식 (빠름) - 크기가 2의 거듭제곱일 때 */
#define SIZE 8  /* 2^3 */
#define MASK (SIZE - 1)  /* 0b111 */

/* 쓰기 */
buffer[in & MASK] = data;  /* 비트 AND는 1사이클! */
in++;

/* 읽기 */
data = buffer[out & MASK];
out++;

/* in, out은 계속 증가하되, 실제 인덱스는 & MASK로 계산 */
💡

왜 2의 거듭제곱? 크기가 2^n이면 index % sizeindex & (size-1)로 대체할 수 있습니다. 모듈로 연산은 수십 사이클이 걸리지만, 비트 AND는 1사이클만에 끝납니다.

Wrap-around 인덱스 비트 레벨 동작

인덱스가 오버플로우될 때 비트 마스킹으로 어떻게 wrap-around되는지 시각화합니다:

kfifo Wrap-around 인덱스 비트 레벨 시각화 Wrap-around: 비트 마스킹으로 인덱스 순환 (크기=8, mask=0b111) 예제 1: in=7 → in=8 (wrap-around 직전 → 직후) in = 7: 0b 0000 0000 0000 0111 mask = 7 (size-1): 0b 0000 0000 0000 0111 in & mask: 0b 0000 0000 0000 0111 = 7 → buf[7]에 접근 in = 8: 0b 0000 0000 0000 1000 mask = 7 (size-1): 0b 0000 0000 0000 0111 in & mask: 0b 0000 0000 0000 0000 = 0 ✓ → buf[0]에 접근 (wrap!) 예제 2: in=15 (거의 가득 참) → in=16 (두 바퀴 돌았음) in = 15: 0b 0000 0000 0000 1111 in & mask (0b111): 0b 0000 0000 0000 0111 = 7 → buf[7] in = 16: 0b 0000 0000 0001 0000 in & mask (0b111): 0b 0000 0000 0000 0000 = 0 ✓ → buf[0] (다시 처음!) 핵심 원리 mask = size - 1 (크기가 2^n일 때): 하위 n비트만 1, 나머지는 0 in & mask: 하위 n비트만 남기고 나머지 제거 → 자동으로 0~(size-1) 범위로 제한 in은 계속 증가하되, 실제 인덱스는 wrap-around → 오버플로우 걱정 없음 (32비트: 4억 번 순환)

kfifo API

struct kfifo 구조

/* include/linux/kfifo.h */
struct __kfifo {
    unsigned int    in;      /* 쓰기 인덱스 (계속 증가) */
    unsigned int    out;     /* 읽기 인덱스 (계속 증가) */
    unsigned int    mask;    /* size - 1 (비트 마스크) */
    unsigned int    esize;   /* 원소 크기 (바이트) */
    void            *data;   /* 실제 버퍼 */
};

#define DECLARE_KFIFO(fifo, type, size) \
    struct __kfifo fifo = { \
        .in = 0, .out = 0, .mask = size - 1, \
        .esize = sizeof(type), \
        .data = (void *)array \
    }

할당과 해제 (Allocation & Deallocation)

/* 동적 할당 */
int kfifo_alloc(struct kfifo *fifo, unsigned int size, gfp_t gfp_mask);

/* 사용 예 */
struct kfifo my_fifo;
int ret = kfifo_alloc(&my_fifo, 1024, GFP_KERNEL);
if (ret) {
    pr_err("kfifo_alloc failed\n");
    return ret;
}

/* 정적 할당 (컴파일 타임) */
DECLARE_KFIFO_PTR(my_fifo, int);
kfifo_alloc(&my_fifo, 128, GFP_KERNEL);

/* 해제 */
void kfifo_free(struct kfifo *fifo);

삽입과 제거 (Enqueue & Dequeue)

/* 데이터 삽입 (enqueue) */
unsigned int kfifo_in(struct kfifo *fifo, const void *buf,
                          unsigned int len);
/* 반환값: 실제로 삽입된 바이트 수 (버퍼 가득 차면 len보다 작을 수 있음) */

/* 데이터 제거 (dequeue) */
unsigned int kfifo_out(struct kfifo *fifo, void *buf,
                           unsigned int len);
/* 반환값: 실제로 읽은 바이트 수 (버퍼 비어있으면 len보다 작을 수 있음) */

/* Peek (읽지 않고 확인만) */
unsigned int kfifo_out_peek(struct kfifo *fifo, void *buf,
                                unsigned int len);

/* 사용 예 */
char data[64] = "Hello, kfifo!";
unsigned int ret;

ret = kfifo_in(&my_fifo, data, strlen(data));
pr_info("Wrote %u bytes\n", ret);

char output[64];
ret = kfifo_out(&my_fifo, output, sizeof(output));
pr_info("Read %u bytes: %s\n", ret, output);

상태 확인 (Query Operations)

/* 버퍼 크기 */
unsigned int kfifo_size(struct kfifo *fifo);

/* 사용 가능한 바이트 수 */
unsigned int kfifo_avail(struct kfifo *fifo);

/* 저장된 데이터 바이트 수 */
unsigned int kfifo_len(struct kfifo *fifo);

/* 비어있는가? */
bool kfifo_is_empty(struct kfifo *fifo);

/* 가득 찼는가? */
bool kfifo_is_full(struct kfifo *fifo);

/* 초기화 (데이터 모두 삭제) */
void kfifo_reset(struct kfifo *fifo);

Lock-free 원리 (Lock-Free Mechanism)

kfifo는 단일 생산자/단일 소비자 구성에서 lock 없이 안전하게 동작합니다. 핵심 원리:

  1. 독립적인 인덱스: in은 producer만, out은 consumer만 수정합니다.
  2. 메모리 배리어: smp_wmb()smp_rmb()로 순서를 보장합니다.
  3. 단조 증가: inout은 절대 감소하지 않으므로 경합이 발생하지 않습니다.
/* lib/kfifo.c - kfifo_in 간략화 버전 */
unsigned int kfifo_in(struct __kfifo *fifo, const void *buf,
                      unsigned int len)
{
    unsigned int l;

    /* 사용 가능한 공간 계산 */
    l = kfifo_unused(fifo);
    if (len > l)
        len = l;

    /* 데이터 복사 */
    __kfifo_in_data(fifo, buf, len, fifo->in);

    /* 쓰기 배리어: 데이터 쓰기가 in 업데이트보다 먼저 보이도록 보장 */
    smp_wmb();

    /* in 인덱스 업데이트 (producer만 수정) */
    fifo->in += len;

    return len;
}

/* kfifo_out 간략화 버전 */
unsigned int kfifo_out(struct __kfifo *fifo, void *buf,
                       unsigned int len)
{
    unsigned int l;

    /* 사용 가능한 데이터 계산 */
    l = fifo->in - fifo->out;
    if (len > l)
        len = l;

    /* 읽기 배리어: in 읽기가 데이터 읽기보다 먼저 보이도록 보장 */
    smp_rmb();

    /* 데이터 복사 */
    __kfifo_out_data(fifo, buf, len, fifo->out);

    /* out 인덱스 업데이트 (consumer만 수정) */
    fifo->out += len;

    return len;
}
⚠️

다중 생산자/소비자: 여러 CPU에서 동시에 kfifo_in()을 호출하거나, 여러 CPU에서 kfifo_out()을 호출하면 경합이 발생합니다. 이 경우 spinlock으로 보호해야 합니다.

메모리 배리어 동작 원리

Lock-free kfifo가 안전하게 동작하는 핵심은 메모리 배리어입니다. 다중 CPU 환경에서의 동작을 시각화합니다:

kfifo 메모리 배리어 다중 CPU 타임라인 Lock-free kfifo: 메모리 배리어로 순서 보장 (다중 CPU) CPU 0 (Producer - kfifo_in) T1: l = size - (in - out) → 사용 가능 공간 계산 T2: memcpy(buf[in], data, len) → 버퍼에 데이터 쓰기 (메모리에 WRITE) T3: smp_wmb() → 쓰기 메모리 배리어 "T2가 먼저 보이도록 강제" T4: fifo→in += len → in 인덱스 증가 Producer는 in만 수정 CPU 1 (Consumer - kfifo_out) T5: l = fifo→in - out → 사용 가능 데이터 계산 T6: smp_rmb() → 읽기 메모리 배리어 "T5의 in 읽기가 T7보다 먼저" T7: memcpy(data, buf[out], len) → 버퍼에서 데이터 읽기 Consumer는 out만 수정 in 값 전파 (캐시 일관성 프로토콜) 핵심: smp_wmb()는 T2가 T4보다 먼저 보이게, smp_rmb()는 T5가 T7보다 먼저 읽히게 보장 → 데이터 무결성
💡

메모리 배리어의 역할: CPU는 성능을 위해 메모리 연산 순서를 재배치할 수 있습니다. 메모리 배리어는 특정 순서를 강제하여 다른 CPU에서 올바른 순서로 보이도록 보장합니다. lock은 배리어를 포함하지만, 배리어만 사용하면 훨씬 가볍습니다.

실사용 사례 (Real-World Usage)

디바이스 드라이버 버퍼링

문자 디바이스 드라이버에서 IRQ 핸들러와 read() 시스템 콜 사이의 버퍼:

/* drivers/tty/serial/serial_core.c 유사 패턴 */
struct uart_port {
    struct kfifo rx_fifo;
    /* ... */
};

/* IRQ 핸들러 (producer) */
static irqreturn_t uart_irq_handler(int irq, void *dev_id)
{
    struct uart_port *port = dev_id;
    char ch;

    /* 하드웨어에서 문자 읽기 */
    ch = readb(port->membase + UART_RX_REG);

    /* kfifo에 저장 (lock-free!) */
    kfifo_in(&port->rx_fifo, &ch, 1);

    return IRQ_HANDLED;
}

/* read() 시스템 콜 (consumer) */
static ssize_t uart_read(struct file *file, char __user *buf,
                          size_t count, loff_t *ppos)
{
    struct uart_port *port = file_private_data(file);
    char tmp[128];
    unsigned int copied;

    /* kfifo에서 읽기 (lock-free!) */
    copied = kfifo_out(&port->rx_fifo, tmp, min(count, sizeof(tmp)));

    if (copy_to_user(buf, tmp, copied))
        return -EFAULT;

    return copied;
}

커널 로그 버퍼

/* kernel/printk/printk.c 유사 패턴 */
static DEFINE_KFIFO(printk_fifo, char, 1024 * 16);

int vprintk_emit(const char *fmt, va_list args)
{
    char text[512];
    int len;

    len = vsnprintf(text, sizeof(text), fmt, args);

    /* 로그 메시지를 kfifo에 저장 */
    kfifo_in(&printk_fifo, text, len);

    /* 콘솔 출력 등 */
    console_unlock();

    return len;
}

실습 예제 (Hands-On Example)

간단한 문자 디바이스 드라이버로 kfifo를 활용하는 예제입니다:

/* chardev_kfifo.c - kfifo 기반 문자 디바이스 */
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/kfifo.h>
#include <linux/uaccess.h>

#define DEVICE_NAME "kfifo_dev"
#define FIFO_SIZE 1024

static dev_t dev_num;
static struct cdev c_dev;
static struct class *dev_class;
static DEFINE_KFIFO(my_fifo, char, FIFO_SIZE);

static ssize_t dev_read(struct file *f, char __user *buf,
                         size_t len, loff_t *off)
{
    char tmp[256];
    unsigned int copied;

    copied = kfifo_out(&my_fifo, tmp, min(len, sizeof(tmp)));
    if (copied == 0)
        return 0;  /* EOF */

    if (copy_to_user(buf, tmp, copied))
        return -EFAULT;

    pr_info("Read %u bytes\n", copied);
    return copied;
}

static ssize_t dev_write(struct file *f, const char __user *buf,
                          size_t len, loff_t *off)
{
    char tmp[256];
    unsigned int copied;
    size_t to_copy = min(len, sizeof(tmp));

    if (copy_from_user(tmp, buf, to_copy))
        return -EFAULT;

    copied = kfifo_in(&my_fifo, tmp, to_copy);
    pr_info("Wrote %u bytes (available: %u)\n",
            copied, kfifo_avail(&my_fifo));

    return copied;
}

static struct file_operations fops = {
    .owner = THIS_MODULE,
    .read = dev_read,
    .write = dev_write,
};

static int __init chardev_init(void)
{
    if (alloc_chrdev_region(&dev_num, 0, 1, DEVICE_NAME) < 0)
        return -1;

    cdev_init(&c_dev, &fops);
    if (cdev_add(&c_dev, dev_num, 1) < 0) {
        unregister_chrdev_region(dev_num, 1);
        return -1;
    }

    dev_class = class_create(THIS_MODULE, DEVICE_NAME);
    device_create(dev_class, NULL, dev_num, NULL, DEVICE_NAME);

    pr_info("kfifo device registered (buffer=%u bytes)\n",
            kfifo_size(&my_fifo));
    return 0;
}

static void __exit chardev_exit(void)
{
    device_destroy(dev_class, dev_num);
    class_destroy(dev_class);
    cdev_del(&c_dev);
    unregister_chrdev_region(dev_num, 1);
    pr_info("kfifo device unregistered\n");
}

module_init(chardev_init);
module_exit(chardev_exit);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("MINZKN");
MODULE_DESCRIPTION("kfifo character device example");

사용 예제

# 모듈 로드
sudo insmod chardev_kfifo.ko

# 디바이스 확인
ls -l /dev/kfifo_dev

# 데이터 쓰기
echo "Hello, kfifo!" > /dev/kfifo_dev

# 데이터 읽기
cat /dev/kfifo_dev
# 출력: Hello, kfifo!

# 커널 로그 확인
dmesg | tail
# kfifo device registered (buffer=1024 bytes)
# Wrote 14 bytes (available: 1010)
# Read 14 bytes

성능 최적화 (Performance Optimization)

기법 설명 효과
2의 거듭제곱 크기 모듈로 대신 비트 마스킹 사용 인덱스 계산 10x 빠름
Lock-free SPSC 단일 생산자/소비자에서 lock 제거 동기화 오버헤드 제거
메모리 배리어 smp_wmb/smp_rmb로 순서 보장 락보다 저렴
연속 메모리 캐시 라인 활용 캐시 미스 감소
💡

벤치마크: lock-free kfifo는 spinlock 기반 큐보다 약 3-5배 빠르며, 특히 고빈도 IRQ 환경에서 성능 차이가 극대화됩니다.

kfifo 동시성 검증 체크리스트

kfifo는 SPSC에서 lock-free가 가능하지만, MPMC 상황에서는 반드시 외부 동기화가 필요합니다. 사용 패턴을 먼저 고정한 뒤 API를 선택해야 안전합니다.

시나리오권장 방식주의점
단일 생산자 / 단일 소비자lock-free kfifo메모리 배리어 의미를 이해하고 사용
다중 생산자 또는 다중 소비자spinlock + kfifoirq 컨텍스트면 irqsave 계열 사용
유저 공간 복사 포함kfifo_to_user/kfifo_from_user부분 복사/오류 처리 분기 점검
# 빈번한 오용 점검
git grep -n "kfifo_in\\|kfifo_out\\|kfifo_to_user\\|kfifo_from_user" -- "*.c"
git grep -n "spin_lock.*kfifo" -- "*.c"

Power-of-2 마스킹 최적화 심화

kfifo의 성능 핵심은 버퍼 크기를 2의 거듭제곱으로 강제하는 설계입니다. 이 제약 덕분에 비용이 큰 나눗셈/모듈로 연산을 비트 AND 한 번으로 대체할 수 있습니다.

2의 거듭제곱 검증 원리

커널은 size & (size - 1) == 0 패턴으로 2의 거듭제곱 여부를 판별합니다. 이 기법은 2의 거듭제곱이 항상 단 하나의 비트만 1인 점을 활용합니다.

size & (size - 1) == 0 으로 2의 거듭제곱 판별 2의 거듭제곱: size = 8 size = 8 → 0b 0000 1000 size - 1 = 7 → 0b 0000 0111 AND 결과 → 0b 0000 0000 = 0 ✓ 비트가 겹치지 않으므로 결과는 항상 0 비 거듭제곱: size = 6 size = 6 → 0b 0000 0110 size - 1 = 5 → 0b 0000 0101 AND 결과 → 0b 0000 0100 = 4 ✗ 공통 비트가 남으므로 0이 아님 모듈로 연산 vs 비트 AND 비교 index % size (나눗셈) x86: DIV 명령 20~40 사이클 ARM: UDIV 명령 2~12 사이클 index & (size-1) (비트 AND) x86: AND 명령 1 사이클 ARM: AND 명령 1 사이클
2의 거듭제곱이면 비트가 겹치지 않아 AND 결과가 0. 이를 이용해 모듈로를 비트 AND로 대체

커널 소스에서 이 검증이 어떻게 사용되는지 확인합니다:

/* include/linux/kfifo.h */

/**
 * kfifo_alloc - 동적 kfifo 할당
 * @fifo: 초기화할 fifo 포인터
 * @size: 요청 크기 (2의 거듭제곱으로 반올림됨)
 * @gfp_mask: GFP 플래그
 */
static inline int __kfifo_alloc(struct __kfifo *fifo,
    unsigned int size, size_t esize, gfp_t gfp_mask)
{
    /* 2의 거듭제곱으로 반올림 */
    size = roundup_pow_of_two(size);

    fifo->in = 0;
    fifo->out = 0;
    fifo->esize = esize;
    /* mask = size - 1: 모든 인덱스 연산에 사용 */
    fifo->mask = size - 1;

    fifo->data = kmalloc_array(esize, size, gfp_mask);
    if (!fifo->data)
        return -ENOMEM;
    return 0;
}

마스킹 인덱스 상세 동작

마스킹 연산의 핵심은 inout무한히 증가하도록 두고, 실제 배열 인덱스만 마스크로 계산하는 것입니다. 이 설계 덕분에 랩어라운드 판단이 불필요합니다.

마스킹 인덱스 연산: in/out은 계속 증가, 실제 접근은 & mask in 값 이진수 mask (0b111) in & mask 배열 인덱스 0 0b 00000 000 & 0b 111 0b 000 0 5 0b 00000 101 & 0b 111 0b 101 5 7 0b 00000 111 & 0b 111 0b 111 7 8 0b 00001 000 & 0b 111 0b 000 0 (wrap!) 13 0b 00001 101 & 0b 111 0b 101 5 16 0b 00010 000 & 0b 111 0b 000 0 (wrap!) 핵심 포인트 1. in, out은 unsigned int 전체 범위(0 ~ 2³²-1)를 사용하며, 절대 리셋하지 않음 2. 실제 배열 인덱스 = in & mask = in & (size - 1): 항상 0 ~ size-1 범위 3. 데이터 개수 = in - out: unsigned 뺄셈이므로 오버플로우에도 정확 4. CPU의 AND 명령은 1사이클, DIV 명령은 20~40사이클 → 20배 이상 성능 차이
in 값이 계속 증가하지만 & mask로 실제 인덱스는 0~7 범위로 자동 순환
/* include/linux/kfifo.h - roundup_pow_of_two 내부 */

/**
 * roundup_pow_of_two - 주어진 값보다 크거나 같은 최소 2의 거듭제곱 반환
 *
 * 예: roundup_pow_of_two(5) = 8
 *     roundup_pow_of_two(8) = 8
 *     roundup_pow_of_two(9) = 16
 */
static inline unsigned long __roundup_pow_of_two(unsigned long n)
{
    return 1UL << fls_long(n - 1);
}

/* fls_long: find last set bit (최상위 설정 비트 위치)
 * fls_long(4) = 3 (bit 2), fls_long(7) = 3
 * 1UL << 3 = 8
 */
성능 영향: kfifo는 고빈도 경로(IRQ 핸들러, 네트워크 패킷 처리)에서 호출됩니다. 초당 수백만 회 호출 시 나눗셈 1회당 20사이클 절약은 전체 시스템 처리량에 큰 영향을 미칩니다. x86에서 DIV 명령은 파이프라인 스톨을 유발하므로 AND로의 대체는 IPC(Instructions Per Cycle) 향상에도 기여합니다.

Lock-free 정확성 증명

kfifo의 SPSC(단일 생산자/단일 소비자) lock-free 동작이 정확한 이유를 메모리 순서 관점에서 증명합니다. 핵심은 smp_wmb()smp_rmb()가 올바른 위치에서 관찰 순서를 보장한다는 점입니다.

메모리 순서와 SPSC 안전성

SPSC Lock-free: 메모리 배리어 배치와 관찰 순서 생산자 (Producer) ① out 읽기 (소비자의 진행 확인) ② 빈 공간 계산: size - (in - out) ③ 데이터 쓰기: buf[in & mask] = val ④ smp_wmb() 데이터 → in 순서 보장 ⑤ in 증가: in += len in은 생산자만 수정 → 경쟁 없음 wmb가 ③→⑤ 순서를 CPU/컴파일러에 강제 소비자 (Consumer) ① in 읽기 (생산자의 진행 확인) ② smp_rmb() in 읽기 → 데이터 읽기 순서 보장 ③ 데이터 개수: in - out ④ 데이터 읽기: val = buf[out & mask] ⑤ out 증가: out += len out은 소비자만 수정 → 경쟁 없음 rmb가 ①→④ 순서를 CPU/컴파일러에 강제 소비자가 in 관찰
생산자의 wmb는 "데이터 먼저, in 나중"을 보장하고, 소비자의 rmb는 "in 먼저 확인, 데이터 나중 읽기"를 보장

이 순서 보장이 왜 안전한지 형식적으로 정리합니다:

정확성 증명 스케치:
  1. 불변식 1: in은 생산자만 수정하고, out은 소비자만 수정합니다 (변수 소유권 분리).
  2. 불변식 2: 생산자는 데이터를 쓴 후에 smp_wmb()를 호출하고, 그 후에 in을 증가시킵니다.
  3. 불변식 3: 소비자는 in을 읽은 후에 smp_rmb()를 호출하고, 그 후에 데이터를 읽습니다.
  4. 결론: 소비자가 새로운 in 값을 관찰했다면, wmb→rmb 쌍에 의해 대응하는 데이터도 반드시 관찰 가능합니다. 따라서 stale 데이터 읽기가 불가능합니다.

다중 생산자 문제

두 생산자가 동시에 kfifo에 쓰려 하면 어떤 문제가 발생하는지 봅니다:

다중 생산자 경쟁 조건 (Race Condition) 시간 → P-A in 읽기 → in=5 buf[5] = 'A' 쓰기 in = 6 갱신 P-B in 읽기 → in=5 buf[5] = 'B' 덮어씀! in = 6 갱신 문제 1: buf[5]에 A와 B가 경쟁 → 데이터 손실 문제 2: in이 6으로만 갱신 → 하나의 항목만 기록 해결: 다중 생산자에서는 외부 동기화 필수 spin_lock(&fifo_lock); kfifo_in(&fifo, &data, len); spin_unlock(&fifo_lock); 또는 kfifo_in_spinlocked() 매크로 사용 (구버전 API)
두 생산자가 같은 in 값을 읽으면 같은 슬롯에 쓰게 되어 데이터 손실 발생

__kfifo_in()__kfifo_out()의 실제 구현에서 배리어 배치를 확인합니다:

/* lib/kfifo.c - __kfifo_in() 핵심 흐름 */

static void kfifo_copy_in(struct __kfifo *fifo,
    const void *src, unsigned int len, unsigned int off)
{
    unsigned int size = fifo->mask + 1;
    unsigned int esize = fifo->esize;
    unsigned int l;

    off &= fifo->mask;  /* 실제 배열 인덱스 */
    /* esize가 0이면 바이트 단위 */
    if (esize != 0) {
        off *= esize;
        size *= esize;
        len *= esize;
    }

    l = min(len, size - off);  /* 끝까지 복사 가능한 양 */
    memcpy(fifo->data + off, src, l);       /* 첫 번째 조각 */
    memcpy(fifo->data, src + l, len - l);   /* wrap 부분 (있으면) */
}

unsigned int __kfifo_in(struct __kfifo *fifo,
    const void *buf, unsigned int len)
{
    unsigned int l;

    l = kfifo_unused(fifo);  /* 빈 공간 계산 */
    if (len > l)
        len = l;              /* 빈 공간만큼만 쓰기 */

    kfifo_copy_in(fifo, buf, len, fifo->in);

    /* ★ 핵심: 데이터 쓰기 완료 후 write barrier */
    smp_wmb();

    /* 그 후에 in 갱신 → 소비자가 새 데이터를 볼 수 있음 */
    fifo->in += len;

    return len;
}

unsigned int __kfifo_out(struct __kfifo *fifo,
    void *buf, unsigned int len)
{
    unsigned int l;

    l = fifo->in - fifo->out;  /* 사용 가능한 데이터 */
    if (len > l)
        len = l;

    /* ★ 핵심: in 읽기 후 read barrier */
    smp_rmb();

    kfifo_copy_out(fifo, buf, len, fifo->out);

    /* 데이터 읽기 완료 후 write barrier */
    smp_wmb();

    /* 그 후에 out 갱신 → 생산자가 공간 회수 가능 */
    fifo->out += len;

    return len;
}
배리어 페어링: smp_wmb()는 반드시 대응하는 smp_rmb()와 쌍을 이루어야 합니다. 생산자의 wmb(데이터→in)가 소비자의 rmb(in→데이터)와 짝을 이루어 크로스-CPU 가시성을 보장합니다. 배리어 하나만 있으면 다른 CPU에서 순서 재배치가 발생할 수 있습니다.

kfifo 내부 구조 분석

kfifo의 핵심 데이터 구조인 struct __kfifo의 각 필드와 헬퍼 함수들이 어떻게 동작하는지 상세히 분석합니다.

구조체 레이아웃

struct __kfifo 메모리 레이아웃 struct __kfifo unsigned int in 오프셋 0, 4바이트 unsigned int out 오프셋 4, 4바이트 unsigned int mask 오프셋 8, size-1 unsigned int esize 오프셋 12, 요소 크기 void *data 오프셋 16, 버퍼 포인터 데이터 버퍼 (kmalloc) [0] 데이터 [1] 데이터 [2] 데이터 [mask-1] [mask] (마지막 슬롯) 총 크기: (mask + 1) * esize 바이트 헬퍼 함수와 필드 관계 kfifo_len() = in - out kfifo_avail() = (mask+1) - (in-out) kfifo_is_empty() = (in == out) kfifo_is_full() = kfifo_len() == (mask+1) kfifo_size() = mask + 1 kfifo_esize() = esize
__kfifo 구조체: 5개 필드로 원형 버퍼의 모든 상태를 관리
/* include/linux/kfifo.h */

struct __kfifo {
    unsigned int    in;      /* 쓰기 오프셋 (생산자만 수정) */
    unsigned int    out;     /* 읽기 오프셋 (소비자만 수정) */
    unsigned int    mask;    /* size - 1, 인덱스 마스킹용 */
    unsigned int    esize;   /* 요소 크기 (바이트), 0이면 바이트 스트림 */
    void            *data;   /* 실제 데이터 버퍼 포인터 */
};

unsigned 산술과 래핑 정확성

inoutunsigned int 범위(0 ~ 232-1)를 넘어 오버플로우해도 올바르게 동작하는 이유를 분석합니다.

unsigned int 오버플로우에서도 in - out이 정확한 이유 정상 케이스 in = 100 out = 95 in - out = 100 - 95 = 5 (정확) 버퍼에 5개 데이터 존재 오버플로우 케이스 in = 3 (0xFFFFFFFF를 넘어 wrap) out = 0xFFFFFFFE in - out = 3 - 0xFFFFFFFE = 5 (정확!) unsigned 뺄셈: 모듈로 2³² 연산 수학적 근거: 모듈로 2³² 산술 unsigned int의 덧셈/뺄셈은 모듈로 2³² 연산입니다. in은 항상 증가만 하고, out도 항상 증가만 합니다 (감소 없음). 따라서 in - out ≡ (실제 삽입 횟수 - 실제 제거 횟수) mod 2³² 버퍼 크기가 2³²보다 작으므로 (최대 2³¹), 결과는 항상 실제 데이터 개수와 일치합니다. 조건: 버퍼에 동시에 2³² 이상의 데이터가 있을 수 없으면 (size ≤ 2³¹) 항상 정확.
C 언어의 unsigned 오버플로우는 정의된 동작(모듈로 2^N)이므로 in - out은 항상 정확한 데이터 수를 반환
/* include/linux/kfifo.h - 헬퍼 함수들 */

/* 현재 저장된 데이터 개수 */
static inline unsigned int kfifo_len(struct __kfifo *fifo)
{
    return fifo->in - fifo->out;  /* unsigned 뺄셈 → 자동 wrap */
}

/* 남은 빈 공간 */
static inline unsigned int kfifo_avail(struct __kfifo *fifo)
{
    return kfifo_size(fifo) - kfifo_len(fifo);
}

/* 비어 있는가? */
static inline bool kfifo_is_empty(struct __kfifo *fifo)
{
    return fifo->in == fifo->out;  /* 같으면 비어 있음 */
}

/* 가득 찼는가? */
static inline bool kfifo_is_full(struct __kfifo *fifo)
{
    return kfifo_len(fifo) > fifo->mask;
    /* mask = size-1 이므로, len > mask ↔ len >= size */
}

/* 전체 크기 */
static inline unsigned int kfifo_size(struct __kfifo *fifo)
{
    return fifo->mask + 1;
}
설계 교훈: kfifo가 in/out을 리셋하지 않는 이유는 단순합니다. 리셋에는 양쪽(생산자/소비자) 모두의 동의가 필요하고, 이는 동기화를 요구합니다. unsigned 오버플로우의 수학적 성질을 활용하면 리셋 없이도 무한히 동작할 수 있습니다.

DMA scatter/gather 연동

kfifo 데이터를 DMA 엔진으로 직접 전송할 때, 버퍼가 원형으로 wrap될 수 있으므로 scatter/gather 리스트가 필요합니다. 커널은 이를 위해 전용 DMA 헬퍼 API를 제공합니다.

DMA 영역 분할 문제

kfifo DMA 전송: wrap-around 시 scatter/gather 분할 케이스 1: 연속 영역 (단일 DMA 전송) [0] [1] [2] out [3] [4] in [5] [6] [7] → sg[0]: data+2*esize, 길이 3*esize (단일 전송) 케이스 2: wrap-around (scatter/gather 2개 필요) [0] ② [1] in [2] [3] [4] [5] [6] out ① [7] ① → sg[0]: data+6*esize, 길이 2*esize (끝까지) → sg[1]: data+0, 길이 2*esize (처음부터) DMA 엔진 scatter/gather 전송 sg[0]: 뒷부분 sg[1]: 앞부분 디바이스 버퍼 (연속)
wrap-around 시 데이터가 버퍼 끝과 시작에 나뉘므로 scatter/gather 리스트 2개가 필요

DMA 헬퍼 API

/* include/linux/kfifo.h - DMA 관련 매크로/함수 */

/**
 * kfifo_dma_in_prepare - DMA 입력을 위한 scatterlist 준비
 * @fifo: kfifo 포인터
 * @sgl: scatterlist 배열 (최소 2개 엔트리)
 * @nents: scatterlist 엔트리 수
 * @len: 전송할 데이터 길이
 *
 * 반환: 실제 사용된 scatterlist 엔트리 수 (1 또는 2)
 */
#define kfifo_dma_in_prepare(fifo, sgl, nents, len) \
({  \
    typeof((fifo) + 1) __tmp = (fifo);  \
    struct scatterlist *__sgl = (sgl);  \
    unsigned int __len = (len);  \
    const size_t __recsize = sizeof(*__tmp->type);  \
    struct __kfifo *__kfifo = &__tmp->kfifo;  \
    (__recsize) ? \
    __kfifo_dma_in_prepare_r(__kfifo, __sgl, nents, __len, __recsize) : \
    __kfifo_dma_in_prepare(__kfifo, __sgl, nents, __len);  \
})

/**
 * kfifo_dma_in_finish - DMA 입력 완료 통지
 * @fifo: kfifo 포인터
 * @len: 실제 DMA로 전송된 바이트 수
 *
 * DMA 전송 완료 후 호출하여 in 포인터를 갱신
 */
#define kfifo_dma_in_finish(fifo, len) \
    __kfifo_dma_in_finish(&(fifo)->kfifo, len)

/**
 * kfifo_dma_out_prepare - DMA 출력을 위한 scatterlist 준비
 * kfifo_dma_out_finish - DMA 출력 완료 통지 (out 갱신)
 */

실제 디바이스 드라이버에서의 DMA + kfifo 통합 예제:

/* DMA scatter/gather로 kfifo에서 디바이스로 전송 */
static int my_dma_tx(struct my_device *dev)
{
    struct scatterlist sg[2];  /* 최대 2개 (wrap 가능) */
    unsigned int nents;
    struct dma_async_tx_descriptor *desc;

    sg_init_table(sg, 2);

    /* kfifo에서 DMA용 scatterlist 준비 */
    nents = kfifo_dma_out_prepare(&dev->tx_fifo, sg, 2,
                                   kfifo_len(&dev->tx_fifo));
    if (!nents)
        return 0;  /* 전송할 데이터 없음 */

    /* scatterlist를 DMA 매핑 */
    nents = dma_map_sg(dev->dev, sg, nents, DMA_TO_DEVICE);

    /* DMA 디스크립터 생성 */
    desc = dmaengine_prep_slave_sg(dev->dma_chan, sg, nents,
                                    DMA_MEM_TO_DEV,
                                    DMA_PREP_INTERRUPT);
    desc->callback = my_dma_tx_complete;
    desc->callback_param = dev;

    /* DMA 전송 시작 */
    dmaengine_submit(desc);
    dma_async_issue_pending(dev->dma_chan);

    return 0;
}

/* DMA 완료 콜백 */
static void my_dma_tx_complete(void *param)
{
    struct my_device *dev = param;
    unsigned int len = kfifo_len(&dev->tx_fifo);

    /* DMA 완료 후 kfifo out 포인터 갱신 */
    kfifo_dma_out_finish(&dev->tx_fifo, len);

    /* DMA 매핑 해제 */
    dma_unmap_sg(dev->dev, dev->sg, 2, DMA_TO_DEVICE);
}
DMA 안전 주의: kfifo_dma_in_prepare()kfifo_dma_in_finish() 사이에 kfifo에 직접 데이터를 쓰면 안 됩니다. DMA 엔진이 물리 주소를 기반으로 전송하므로, 중간에 데이터가 변경되면 일관성이 깨집니다. 반드시 finish() 호출 후에 다음 작업을 시작하세요.

레코드 모드 (Record FIFO)

기본 kfifo는 고정 크기 요소만 처리합니다. 가변 길이 데이터(로그 메시지, 네트워크 패킷 등)를 위해 커널은 레코드 모드를 제공합니다. 각 레코드 앞에 길이 접두사를 붙여 경계를 구분합니다.

레코드 레이아웃

레코드 모드 kfifo: 가변 길이 레코드 저장 고정 크기 모드 (DECLARE_KFIFO) elem 4B elem 4B elem 4B elem 4B ... 레코드 모드 (STRUCT_KFIFO_REC_1): 1바이트 길이 접두사 len 레코드 1 (8B) len 레코드 2 (20B) len 3 (3B) ... max 255B/레코드 레코드 모드 (STRUCT_KFIFO_REC_2): 2바이트 길이 접두사 len16 레코드 1 (512B) len16 레코드 2 (64B) ... max 65535B/레코드 선언 매크로 비교 DECLARE_KFIFO(name, type, size) 고정 크기 type 요소, esize=sizeof(type) STRUCT_KFIFO_REC_1(size) 가변 길이, 1바이트 헤더, 레코드 최대 255B STRUCT_KFIFO_REC_2(size) 가변 길이, 2바이트 헤더, 레코드 최대 65535B
레코드 모드는 각 데이터 앞에 길이 접두사(1 또는 2바이트)를 저장하여 가변 길이 지원
/* include/linux/kfifo.h - 레코드 모드 선언 */

/* 1바이트 레코드 헤더: 레코드 최대 255바이트 */
struct my_rec1_fifo STRUCT_KFIFO_REC_1(256);

/* 2바이트 레코드 헤더: 레코드 최대 65535바이트 */
struct my_rec2_fifo STRUCT_KFIFO_REC_2(4096);

/* 초기화 */
INIT_KFIFO(my_fifo);

/* 레코드 넣기: 길이 접두사가 자동으로 추가됨 */
struct log_entry {
    u64 timestamp;
    u32 level;
    char msg[200];
};

struct log_entry entry = {
    .timestamp = ktime_get_ns(),
    .level = KERN_WARNING,
    .msg = "디스크 I/O 지연 감지",
};

/* kfifo_in은 sizeof(entry) 바이트 + 길이 접두사를 저장 */
kfifo_in(&my_fifo, &entry, sizeof(entry));

/* 짧은 메시지도 같은 fifo에 저장 가능 (가변 길이!) */
struct log_entry short_entry = {
    .timestamp = ktime_get_ns(),
    .level = KERN_INFO,
    .msg = "OK",
};
kfifo_in(&my_fifo, &short_entry, offsetof(struct log_entry, msg) + 3);

/* 레코드 꺼내기: 저장된 길이만큼만 복사 */
struct log_entry out_entry;
unsigned int len = kfifo_out(&my_fifo, &out_entry, sizeof(out_entry));
/* len = 실제 레코드 크기 (접두사 제외) */

/* 다음 레코드 크기 미리 확인 (꺼내지 않고) */
unsigned int next_len = kfifo_peek_len(&my_fifo);
활용 사례: 레코드 모드 kfifo는 다음과 같은 상황에서 유용합니다:
  • 커널 로그 버퍼: 가변 길이 로그 메시지 저장
  • Netlink 메시지 큐잉: 크기가 다른 netlink 메시지 버퍼링
  • USB 요청 블록: 가변 크기 URB 데이터 임시 저장
  • 오디오 프레임: 다양한 크기의 오디오 패킷 버퍼링

문자 디바이스 드라이버 통합

kfifo는 문자 디바이스 드라이버에서 사용자 공간과 커널 공간 사이의 데이터 교환 버퍼로 자주 사용됩니다. kfifo_to_user()kfifo_from_user()copy_to_user()/copy_from_user()를 내부적으로 처리합니다.

데이터 흐름

문자 디바이스 드라이버: 사용자 공간 ↔ kfifo ↔ 하드웨어 사용자 공간 write(fd, buf, len) read(fd, buf, len) poll(fd, ...) /dev/kfifo_example write 커널 kfifo kfifo_from_user() kfifo_to_user() kfifo_is_empty() 원형 버퍼 read TX 하드웨어 UART TX FIFO UART RX FIFO IRQ 인터럽트 RX file_operations 콜백과 kfifo API 매핑 .write → kfifo_from_user(&fifo, ubuf, count, &copied) .read → kfifo_to_user(&fifo, ubuf, count, &copied) .poll → kfifo_is_empty(&fifo) ? 0 : POLLIN .release → kfifo_free(&fifo)
write()→kfifo_from_user()→kfifo→kfifo_to_user()→read() 데이터 흐름

완전한 문자 디바이스 드라이버 예제

#include <linux/module.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/kfifo.h>
#include <linux/poll.h>
#include <linux/wait.h>

#define FIFO_SIZE  1024   /* 2의 거듭제곱 */
#define DEV_NAME   "kfifo_example"

static struct cdev my_cdev;
static dev_t dev_num;
static struct class *dev_class;

/* 정적 kfifo 선언 (바이트 스트림 모드) */
static DECLARE_KFIFO(my_fifo, unsigned char, FIFO_SIZE);

/* 읽기 대기 큐 */
static DECLARE_WAIT_QUEUE_HEAD(read_wq);
static DECLARE_WAIT_QUEUE_HEAD(write_wq);

/* 다중 접근 보호용 뮤텍스 */
static DEFINE_MUTEX(fifo_lock);

static ssize_t my_read(struct file *file,
    char __user *ubuf, size_t count, loff_t *ppos)
{
    int ret;
    unsigned int copied;

    /* non-blocking 모드에서 비어있으면 즉시 반환 */
    if (kfifo_is_empty(&my_fifo)) {
        if (file->f_flags & O_NONBLOCK)
            return -EAGAIN;

        /* blocking 모드: 데이터가 올 때까지 대기 */
        ret = wait_event_interruptible(read_wq,
            !kfifo_is_empty(&my_fifo));
        if (ret)
            return ret;
    }

    mutex_lock(&fifo_lock);
    ret = kfifo_to_user(&my_fifo, ubuf, count, &copied);
    mutex_unlock(&fifo_lock);

    if (ret)
        return ret;

    /* 공간이 생겼으므로 write 대기자 깨우기 */
    if (!kfifo_is_full(&my_fifo))
        wake_up_interruptible(&write_wq);

    return copied;
}

static ssize_t my_write(struct file *file,
    const char __user *ubuf, size_t count, loff_t *ppos)
{
    int ret;
    unsigned int copied;

    if (kfifo_is_full(&my_fifo)) {
        if (file->f_flags & O_NONBLOCK)
            return -EAGAIN;

        ret = wait_event_interruptible(write_wq,
            !kfifo_is_full(&my_fifo));
        if (ret)
            return ret;
    }

    mutex_lock(&fifo_lock);
    ret = kfifo_from_user(&my_fifo, ubuf, count, &copied);
    mutex_unlock(&fifo_lock);

    if (ret)
        return ret;

    /* 데이터가 들어왔으므로 read 대기자 깨우기 */
    if (!kfifo_is_empty(&my_fifo))
        wake_up_interruptible(&read_wq);

    return copied;
}

static __poll_t my_poll(struct file *file,
    struct poll_table_struct *wait)
{
    __poll_t mask = 0;

    poll_wait(file, &read_wq, wait);
    poll_wait(file, &write_wq, wait);

    if (!kfifo_is_empty(&my_fifo))
        mask |= EPOLLIN | EPOLLRDNORM;   /* 읽기 가능 */
    if (!kfifo_is_full(&my_fifo))
        mask |= EPOLLOUT | EPOLLWRNORM;  /* 쓰기 가능 */

    return mask;
}

static const struct file_operations my_fops = {
    .owner   = THIS_MODULE,
    .read    = my_read,
    .write   = my_write,
    .poll    = my_poll,
    .llseek  = noop_llseek,
};

static int __init kfifo_dev_init(void)
{
    int ret;

    INIT_KFIFO(my_fifo);

    ret = alloc_chrdev_region(&dev_num, 0, 1, DEV_NAME);
    if (ret < 0)
        return ret;

    cdev_init(&my_cdev, &my_fops);
    ret = cdev_add(&my_cdev, dev_num, 1);
    if (ret)
        goto err_cdev;

    dev_class = class_create(DEV_NAME);
    device_create(dev_class, NULL, dev_num, NULL, DEV_NAME);

    pr_info("kfifo_example: /dev/%s 생성 완료\n", DEV_NAME);
    return 0;

err_cdev:
    unregister_chrdev_region(dev_num, 1);
    return ret;
}

static void __exit kfifo_dev_exit(void)
{
    device_destroy(dev_class, dev_num);
    class_destroy(dev_class);
    cdev_del(&my_cdev);
    unregister_chrdev_region(dev_num, 1);
    pr_info("kfifo_example: 디바이스 제거\n");
}

module_init(kfifo_dev_init);
module_exit(kfifo_dev_exit);
MODULE_LICENSE("GPL");
MODULE_DESCRIPTION("kfifo 기반 문자 디바이스 예제");

사용자 공간에서 테스트:

# 모듈 로드
sudo insmod kfifo_example.ko

# 데이터 쓰기
echo "Hello kfifo!" > /dev/kfifo_example

# 데이터 읽기
cat /dev/kfifo_example
# 출력: Hello kfifo!

# non-blocking 테스트 (비어있을 때)
dd if=/dev/kfifo_example of=/dev/null bs=1 count=1 iflag=nonblock
# dd: /dev/kfifo_example: Resource temporarily unavailable
kfifo_to_user vs copy_to_user: kfifo_to_user()는 내부적으로 copy_to_user()를 호출하지만, 원형 버퍼의 wrap-around를 자동으로 처리합니다 (최대 2번의 copy_to_user() 호출). 직접 copy_to_user()를 사용하면 wrap 처리를 수동으로 해야 합니다.

네트워크 서브시스템 활용

kfifo는 네트워크 관련 드라이버에서 데이터 버퍼링에 널리 사용됩니다. 특히 UART/시리얼 드라이버에서 TX/RX 버퍼로 활용하는 패턴이 대표적입니다.

네트워크 패킷 버퍼링 패턴

UART 시리얼 드라이버: kfifo TX/RX 버퍼링 TTY 레이어 tty_write() tty_read() TX 데이터 TX kfifo UART 하드웨어 TX FIFO 레지스터 RX FIFO 레지스터 외부 RX kfifo RX 데이터 IRQ 커널 내 kfifo 사용 드라이버 예시 시리얼 드라이버 drivers/tty/serial/ uart_port.state->xmit USB 시리얼 drivers/usb/serial/ usb_serial_port kfifo IPC 메시지 큐 커널 내부 모듈 간 통신 이벤트 알림 버퍼링 핵심 패턴: IRQ 핸들러에서 kfifo_in() → 워크큐/태스클릿에서 kfifo_out() (SPSC) IRQ(생산자)와 프로세스 컨텍스트(소비자)가 각각 하나이므로 lock-free 가능 SPSC 패턴의 자연스러운 매핑: IRQ → kfifo → 워크큐/태스크
UART 드라이버의 전형적인 kfifo 활용: IRQ에서 수신, 프로세스 컨텍스트에서 처리

커널 시리얼 드라이버에서의 kfifo 활용 코드:

/* drivers/tty/serial/ 기반 UART kfifo 패턴 */

/* IRQ 핸들러: 하드웨어 RX FIFO → kfifo */
static irqreturn_t uart_rx_irq(int irq, void *dev_id)
{
    struct my_uart *uart = dev_id;
    unsigned char ch;

    while (uart_rx_ready(uart)) {
        ch = uart_read_byte(uart);

        /* lock-free: IRQ가 유일한 생산자 */
        if (kfifo_in(&uart->rx_fifo, &ch, 1) == 0)
            uart->rx_overrun++;  /* 오버런 카운터 */
    }

    /* 워크큐에서 데이터 처리 예약 */
    schedule_work(&uart->rx_work);

    return IRQ_HANDLED;
}

/* 워크큐: kfifo → TTY 레이어 */
static void uart_rx_work(struct work_struct *work)
{
    struct my_uart *uart = container_of(work,
        struct my_uart, rx_work);
    unsigned char buf[256];
    unsigned int len;

    /* lock-free: 워크큐가 유일한 소비자 */
    while ((len = kfifo_out(&uart->rx_fifo, buf, sizeof(buf)))) {
        tty_insert_flip_string(&uart->port, buf, len);
    }
    tty_flip_buffer_push(&uart->port);
}

/* TX: TTY 레이어 → kfifo → 하드웨어 */
static void uart_start_tx(struct uart_port *port)
{
    struct my_uart *uart = to_my_uart(port);
    unsigned char ch;

    while (uart_tx_ready(uart) &&
           kfifo_out(&uart->tx_fifo, &ch, 1)) {
        uart_write_byte(uart, ch);
    }
}
IPC 메시지 큐잉 패턴: 커널 모듈 간 메시지 전달에도 kfifo가 효과적입니다. 구조체를 직접 넣고 빼면 memcpy 비용만 발생하며, 별도의 메시지 할당/해제가 불필요합니다. 다만 구조체 크기가 매우 크면 포인터를 kfifo에 넣는 방식이 더 효율적입니다.

ftrace/perf 디버깅 실습

kfifo 기반 드라이버를 디버깅하고 성능을 측정하는 실무적인 방법을 다룹니다. ftrace, bpftrace, perf를 사용하여 kfifo 동작을 관찰하고 병목을 찾는 기법입니다.

ftrace로 kfifo 함수 추적

# kfifo 관련 커널 함수 목록 확인
cat /sys/kernel/debug/tracing/available_filter_functions | grep kfifo

# function tracer 설정
echo 0 > /sys/kernel/debug/tracing/tracing_on
echo function > /sys/kernel/debug/tracing/current_tracer

# kfifo 함수만 필터링
echo '__kfifo_in __kfifo_out __kfifo_alloc' > \
    /sys/kernel/debug/tracing/set_ftrace_filter

# 추적 시작
echo 1 > /sys/kernel/debug/tracing/tracing_on

# 드라이버 동작 유발 (예: 시리얼 포트 사용)
echo "test" > /dev/ttyS0

# 추적 결과 확인
cat /sys/kernel/debug/tracing/trace

# 예상 출력:
#  kfifo_exampl-1234 [001] .... 12345.678: __kfifo_in <-uart_write
#  kfifo_exampl-1234 [001] .... 12345.679: __kfifo_out <-uart_start_tx

# 추적 종료
echo 0 > /sys/kernel/debug/tracing/tracing_on
echo nop > /sys/kernel/debug/tracing/current_tracer

bpftrace로 kfifo 채움 수준 모니터링

/* kfifo_monitor.bt - kfifo 채움 수준 실시간 모니터링 */
/* 실행: sudo bpftrace kfifo_monitor.bt */

/* __kfifo_in 진입 시 삽입 크기 히스토그램 */
kprobe:__kfifo_in
{
    @in_size = hist(arg2);  /* arg2 = len */
    @in_count = count();
}

/* __kfifo_out 진입 시 추출 크기 히스토그램 */
kprobe:__kfifo_out
{
    @out_size = hist(arg2);
    @out_count = count();
}

/* __kfifo_in 반환값으로 실제 삽입된 양 확인 */
kretprobe:__kfifo_in
{
    @actual_in = hist(retval);
    if (retval == 0) {
        @fifo_full = count();  /* kfifo가 꽉 차서 삽입 실패 */
    }
}

/* 주기적 요약 (5초마다) */
interval:s:5
{
    printf("--- kfifo 통계 (5초) ---\n");
    printf("삽입 횟수: %lld, 추출 횟수: %lld\n",
        @in_count, @out_count);
    printf("kfifo full 발생: %lld\n", @fifo_full);
    print(@in_size);
    clear(@in_count); clear(@out_count); clear(@fifo_full);
}

perf probe로 처리량 측정

# kfifo 함수에 동적 probe 추가
sudo perf probe --add '__kfifo_in len=%dx:u32'
sudo perf probe --add '__kfifo_out len=%dx:u32'

# 10초간 kfifo 호출 통계 수집
sudo perf stat -e 'probe:__kfifo_in,probe:__kfifo_out' -a sleep 10

# 예상 출력:
#  Performance counter stats for 'system wide':
#     1,234,567 probe:__kfifo_in
#     1,234,000 probe:__kfifo_out
#    10.001234 seconds time elapsed

# 호출 스택과 함께 기록
sudo perf record -g -e 'probe:__kfifo_in' -a sleep 5
sudo perf report --sort comm,symbol

# probe 정리
sudo perf probe --del '__kfifo_in'
sudo perf probe --del '__kfifo_out'

버퍼 오버플로우/언더플로우 디버깅

/* 디버깅용 kfifo 상태 출력 함수 */
static void kfifo_debug_dump(struct kfifo *fifo,
    const char *label)
{
    pr_debug("%s: in=%u out=%u len=%u avail=%u full=%d empty=%d\n",
        label,
        fifo->kfifo.in,
        fifo->kfifo.out,
        kfifo_len(fifo),
        kfifo_avail(fifo),
        kfifo_is_full(fifo),
        kfifo_is_empty(fifo));
}

/* 오버플로우 감지 래퍼 */
static unsigned int kfifo_in_debug(struct kfifo *fifo,
    const void *buf, unsigned int len)
{
    unsigned int avail = kfifo_avail(fifo);
    unsigned int ret;

    if (len > avail) {
        pr_warn("kfifo: 오버플로우 시도! 요청=%u 가용=%u\n",
            len, avail);
        dump_stack();  /* 호출 스택 출력 */
    }

    ret = kfifo_in(fifo, buf, len);

    if (ret != len)
        pr_warn("kfifo: 부분 삽입 %u/%u\n", ret, len);

    return ret;
}
프로덕션 주의: 디버깅 코드는 성능에 영향을 줍니다. pr_debug()CONFIG_DYNAMIC_DEBUG가 설정되면 런타임에 켜고 끌 수 있으므로, pr_info()printk() 대신 pr_debug()를 사용하세요. echo 'module my_driver +p' > /sys/kernel/debug/dynamic_debug/control로 활성화합니다.

kfifo vs 대안 비교

커널에는 kfifo 외에도 여러 링 버퍼 구현이 있습니다. 각각의 설계 목표와 사용 시나리오가 다르므로, 적절한 선택이 중요합니다.

주요 링 버퍼 비교

특성 kfifo ring_buffer (ftrace) io_uring SQ/CQ virtio vring
주 용도 범용 데이터 전달 이벤트 트레이싱 비동기 I/O 가상화 I/O
동시성 SPSC lock-free MPSC lock-free SPSC lock-free SPSC/MPSC
요소 크기 고정/가변 (레코드 모드) 가변 (이벤트 헤더) 고정 (SQE/CQE) 고정 (descriptor)
메모리 공유 커널 내부 커널→유저 (mmap) 커널↔유저 (mmap) 호스트↔게스트
오버플로우 처리 삽입 거부 (부분 삽입) 오래된 데이터 덮어쓰기 삽입 거부 삽입 거부
크기 제약 2의 거듭제곱 페이지 정렬 2의 거듭제곱 2의 거듭제곱
DMA 지원 scatter/gather API 없음 없음 scatter/gather
복잡도 낮음 (~400줄) 높음 (~3000줄) 중간 (~2000줄) 중간 (~1500줄)
헤더 파일 linux/kfifo.h linux/ring_buffer.h linux/io_uring.h linux/virtio_ring.h
링 버퍼 설계 패턴 비교 kfifo 단순한 in/out 2개 인덱스 데이터 복사 방식 SPSC 최적 커널 내부 전용 ring_buffer per-CPU 페이지 연결 리스트 기반 이벤트 헤더 포함 MPSC lock-free 덮어쓰기 모드 io_uring SQ/CQ 공유 메모리 (mmap) 고정 SQE/CQE 배치 제출/완료 zero-copy 가능 유저↔커널 virtio vring 3영역: desc/avail/used 간접 디스크립터 알림 억제 이기종 메모리 호스트↔게스트 선택 가이드 kfifo 선택: 커널 내부 SPSC 데이터 전달, DMA 연동, 드라이버 버퍼링 ring_buffer 선택: 이벤트 트레이싱, 다중 CPU 기록, 오래된 데이터 자동 폐기 io_uring 선택: 고성능 비동기 I/O, 사용자↔커널 간 zero-copy virtio vring 선택: 가상화 디바이스 에뮬레이션, 호스트↔게스트 통신 다중 생산자 lock-free 필요: LMAX Disruptor 패턴 또는 MPSC 전용 큐 (커널 외부 라이브러리) 고려
각 링 버퍼의 설계 목표와 적합한 사용 시나리오가 다르므로 요구사항에 맞는 선택이 중요
LMAX Disruptor 패턴: 다중 생산자/다중 소비자를 lock-free로 처리하는 고성능 패턴입니다. CAS(Compare-And-Swap) 기반 시퀀스 카운터를 사용하여 각 생산자가 고유 슬롯을 예약합니다. 커널에서는 이 패턴이 직접 구현되어 있지 않지만, ring_buffer의 MPSC 설계에서 유사한 아이디어가 사용됩니다.

소스 코드 워크스루

lib/kfifo.cinclude/linux/kfifo.h의 핵심 함수들을 호출 순서대로 분석합니다. 전체 소스는 약 400줄로, 간결하면서도 견고한 구현입니다.

주요 함수 호출 그래프

kfifo 핵심 함수 호출 그래프 할당/초기화 kfifo_alloc() __kfifo_alloc() roundup_pow_of_two kmalloc_array 삽입 (In) kfifo_in() __kfifo_in() kfifo_copy_in() smp_wmb() memcpy() x2 (wrap 분할 복사) 추출 (Out) kfifo_out() __kfifo_out() smp_rmb() kfifo_copy_out() memcpy() x2 (wrap 분할 복사) kfifo_free() __kfifo_free() kfree(data) kfifo_copy_in / kfifo_copy_out 핵심 로직 off = offset & mask; // 실제 배열 위치 l = min(len, size - off); // 끝까지 복사량 memcpy(data + off, src, l); // 1차: 끝까지 memcpy(data, src + l, len - l);// 2차: 처음부터 (wrap) len - l이 0이면 2차 memcpy는 no-op
kfifo 핵심 함수 호출 계층: alloc→in(copy_in+wmb)→out(rmb+copy_out)→free

__kfifo_alloc() 상세

/* lib/kfifo.c */

int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
        size_t esize, gfp_t gfp_mask)
{
    /*
     * 크기를 2의 거듭제곱으로 반올림
     * 예: size=6 → 8, size=9 → 16
     */
    size = roundup_pow_of_two(size);

    fifo->in = 0;
    fifo->out = 0;
    fifo->esize = esize;  /* 요소 크기 (바이트) */

    if (size < 2) {
        /* 최소 크기 2: in != out 조건으로 empty/full 구별 */
        fifo->data = NULL;
        fifo->mask = 0;
        return -EINVAL;
    }

    fifo->data = kmalloc_array(esize, size, gfp_mask);

    if (!fifo->data) {
        fifo->mask = 0;
        return -ENOMEM;
    }

    fifo->mask = size - 1;  /* 모든 인덱싱에 사용될 마스크 */

    return 0;
}

memcpy 분할 복사 상세

원형 버퍼에서 wrap이 발생하면 데이터가 버퍼 끝과 처음에 나뉩니다. kfifo_copy_in()kfifo_copy_out()은 이를 최대 2번의 memcpy()로 처리합니다.

/* lib/kfifo.c - kfifo_copy_in 분석 */

static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
        unsigned int len, unsigned int off)
{
    unsigned int size = fifo->mask + 1;  /* 전체 버퍼 크기 */
    unsigned int esize = fifo->esize;     /* 요소 크기 */
    unsigned int l;

    off &= fifo->mask;  /* 실제 배열 인덱스: off mod size */

    /* 바이트 단위로 변환 (esize가 1이 아닌 경우) */
    if (esize != 0) {
        off *= esize;
        size *= esize;
        len *= esize;
    }

    /*
     * l = 현재 위치(off)에서 버퍼 끝까지 복사 가능한 바이트 수
     *
     * 예: size=64, off=50, len=20
     *   l = min(20, 64-50) = min(20, 14) = 14
     *   1차 memcpy: buf[50..63] ← src[0..13]  (14바이트)
     *   2차 memcpy: buf[0..5]   ← src[14..19] (6바이트)
     *
     * 예: size=64, off=10, len=20
     *   l = min(20, 64-10) = min(20, 54) = 20
     *   1차 memcpy: buf[10..29] ← src[0..19] (20바이트)
     *   2차 memcpy: len-l = 0 → no-op
     */
    l = min(len, size - off);

    memcpy(fifo->data + off, src, l);        /* 1차: 끝까지 */
    memcpy(fifo->data, src + l, len - l);    /* 2차: 처음부터 */
}

/* kfifo_copy_out도 동일한 분할 로직 (방향만 반대) */
static void kfifo_copy_out(struct __kfifo *fifo, void *dst,
        unsigned int len, unsigned int off)
{
    unsigned int size = fifo->mask + 1;
    unsigned int esize = fifo->esize;
    unsigned int l;

    off &= fifo->mask;

    if (esize != 0) {
        off *= esize;
        size *= esize;
        len *= esize;
    }

    l = min(len, size - off);

    memcpy(dst, fifo->data + off, l);        /* 1차: 끝까지 */
    memcpy(dst + l, fifo->data, len - l);    /* 2차: 처음부터 */
}

/* __kfifo_free */
void __kfifo_free(struct __kfifo *fifo)
{
    kfree(fifo->data);
    fifo->in = 0;
    fifo->out = 0;
    fifo->esize = 0;
    fifo->data = NULL;
    fifo->mask = 0;
}
성능 최적화 포인트: wrap이 발생하지 않는 경우(대부분), 두 번째 memcpy()는 길이 0으로 호출되어 컴파일러가 최적화합니다. 즉, 일반적인 경우 오버헤드 없이 단일 memcpy()만 실행됩니다. GCC와 Clang 모두 길이 0인 memcpy()를 완전히 제거합니다.
소스 코드 위치 정리:
  • include/linux/kfifo.h: 매크로, 인라인 함수, 타입 안전 래퍼 (약 700줄)
  • lib/kfifo.c: 핵심 구현 함수 (약 400줄)
  • samples/kfifo/: 커널 내장 예제 코드 (bytestream, inttype, record)
git log --oneline lib/kfifo.c로 변경 이력을 확인할 수 있습니다.

kfifo와 관련된 다른 주제를 더 깊이 이해하고 싶다면 다음 문서를 참고하세요.