Apache Kafka — Tài liệu kỹ thuật toàn diện

Từ cơ bản đến nâng cao • Kiến trúc, cài đặt Docker & ví dụ thực chiến

v3.7+KRaftDockerTiếng Việt

1. Giới thiệu tổng quan về Kafka

Apache Kafka là một nền tảng event streaming phân tán mã nguồn mở, được thiết kế để xử lý luồng dữ liệu (stream) khối lượng lớn với thông lượng cao, độ trễ thấp và khả năng mở rộng ngang (horizontal scaling). Kafka hoạt động như một "hệ thần kinh trung ương" (central nervous system) cho dữ liệu, kết nối các nguồn sinh dữ liệu (producers) với các hệ thống tiêu thụ dữ liệu (consumers) theo mô hình publish-subscribe.

1.1. Kafka dùng để làm gì?

1.2. Vì sao Kafka phổ biến?

Đặc điểmÝ nghĩa
Thông lượng caoXử lý hàng triệu message/giây trên phần cứng thông thường.
Bền vững (Durable)Ghi message xuống đĩa, replicate nhiều bản.
Phân tán (Distributed)Chạy trên cluster nhiều broker, horizontal scale.
Lưu trữ dài hạnKhác với queue truyền thống, Kafka giữ message theo retention policy.
Thứ tự (Ordering)Đảm bảo thứ tự trong từng partition.
ReplayConsumer có thể đọc lại lịch sử bằng cách reset offset.
Hệ sinh thái rộngConnect, Streams, ksqlDB, Schema Registry…

2. Lịch sử & bối cảnh ra đời

Kafka được tạo ra tại LinkedIn năm 2010 bởi Jay Kreps, Neha Narkhede và Jun Rao để giải quyết bài toán tích hợp dữ liệu giữa hàng trăm hệ thống nội bộ. Năm 2011 được open-source và chuyển sang Apache Software Foundation. Tên "Kafka" được đặt theo nhà văn Franz Kafka vì "đây là một hệ thống tối ưu cho việc ghi chép".

Các cột mốc quan trọng:

3. Kiến trúc tổng thể

Apache Kafka Cluster Architecture — Sơ đồ kiến trúc Scroll / zoom · Mở draw.io ↗
┌──────────────────────────────────────────┐ │ KAFKA CLUSTER │ │ │ ┌──────────┐ │ ┌────────┐ ┌────────┐ ┌────────┐ │ ┌──────────┐ │ Producer │ ───► │ │Broker 1│ │Broker 2│ │Broker 3│ │ ───► │ Consumer │ │ (App) │ │ │ │ │ │ │ │ │ │ Group A │ └──────────┘ │ │ P0 (L) │ │ P0 (F) │ │ P0 (F) │ │ └──────────┘ │ │ P1 (F) │ │ P1 (L) │ │ P1 (F) │ │ ┌──────────┐ │ │ P2 (F) │ │ P2 (F) │ │ P2 (L) │ │ ┌──────────┐ │ Producer │ ───► │ └────────┘ └────────┘ └────────┘ │ ───► │ Consumer │ │ (App) │ │ │ │ Group B │ └──────────┘ │ Topic "orders" — 3 partitions, RF=3 │ └──────────┘ └──────────────────────────────────────────┘ ▲ ▲ │ │ ┌───────┴──────┐ ┌──────┴──────┐ │ KRaft │ │ Schema │ │ Controller │ │ Registry │ └──────────────┘ └─────────────┘

Kafka cluster được tạo thành từ nhiều broker. Mỗi topic được chia thành các partition và mỗi partition được replicate ra nhiều broker để đảm bảo độ sẵn sàng. Trong ba bản sao của một partition, một bản được chọn làm leader (phục vụ read/write) và các bản còn lại là follower (replicate data).

4. Các khái niệm cốt lõi

4.1. Message / Record

Đơn vị dữ liệu nhỏ nhất trong Kafka. Một record gồm:

4.2. Topic

Topic là "danh mục" chứa các record, tương tự "table" trong database hoặc "folder" trong filesystem. Topic là append-only log — chỉ ghi thêm, không sửa/xóa trực tiếp. Một topic có thể có nhiều producer ghi vào và nhiều consumer đọc ra cùng lúc.

4.3. Partition

Mỗi topic được chia thành partition để scale ngang. Partition là đơn vị song song hóa và đơn vị đảm bảo thứ tự. Thứ tự chỉ được đảm bảo trong cùng một partition.

Topic: "orders", 3 partitions Partition 0: [msg0][msg1][msg2][msg3][msg4]... 0 1 2 3 4 (offset) Partition 1: [msg0][msg1][msg2]... 0 1 2 Partition 2: [msg0][msg1][msg2][msg3]... 0 1 2 3

4.4. Offset

Số thứ tự (long, 64-bit) duy nhất trong một partition. Consumer tự lưu offset mình đã đọc đến đâu (thường commit lên topic nội bộ __consumer_offsets).

4.5. Broker

Một server Kafka chạy process kafka.Kafka. Mỗi broker có một broker.id duy nhất trong cluster. Broker nhận request từ producer/consumer và quản lý partition được gán cho nó.

4.6. Producer

Client gửi record vào Kafka. Producer quyết định record sẽ đi vào partition nào dựa trên:

4.7. Consumer & Consumer Group

Consumer đọc record từ topic. Nhiều consumer có thể nhóm lại thành Consumer Group. Trong một group, mỗi partition chỉ được gán cho đúng một consumer — đây là cơ chế cân bằng tải tự động.

4.8. Replication Factor (RF)

Số bản sao của mỗi partition. RF=3 nghĩa là mỗi partition có 3 bản trên 3 broker khác nhau. Production thường dùng RF ≥ 3.

4.9. In-Sync Replicas (ISR)

Tập các replica đang "đồng bộ" với leader. Chỉ các replica trong ISR mới đủ điều kiện được bầu làm leader mới khi leader chết.

4.10. Retention

Chính sách giữ message. Có thể cấu hình theo:

5. Zookeeper vs KRaft

Trước Kafka 2.8, mọi cluster Kafka đều cần Apache Zookeeper để lưu metadata (danh sách broker, topic, partition, ACL…) và tổ chức controller election. Từ phiên bản 3.3+, KRaft (Kafka Raft) đã sẵn sàng production và thay thế hoàn toàn Zookeeper.

Tiêu chíZookeeper modeKRaft mode
Thành phầnKafka + ZookeeperChỉ Kafka
Cài đặtPhức tạp hơnĐơn giản
Scale metadata~200k partitionHàng triệu partition
Startup timeChậmNhanh hơn 10×
Khuyến nghịLegacyKhuyến nghị mới
Khuyến nghị: Với dự án mới, luôn dùng KRaft mode. Hướng dẫn Docker ở phần 7 sẽ chạy theo KRaft.

6. Cơ chế hoạt động chi tiết

6.1. Quy trình gửi message (Producer flow)

  1. Producer serialize keyvalue thành byte.
  2. Partitioner xác định partition đích.
  3. Record được đưa vào RecordAccumulator (buffer trong memory).
  4. Background thread Sender gom các record cùng partition thành batch (batch.size, linger.ms).
  5. Gửi request tới leader broker của partition đó.
  6. Broker ghi xuống page cache → log segment trên đĩa.
  7. Trả ack về producer theo config acks:
    • acks=0 — fire-and-forget (không chờ ack).
    • acks=1 — chờ leader ghi xong.
    • acks=all (hoặc -1) — chờ tất cả ISR ghi xong (bền vững nhất).

6.2. Cấu trúc lưu trữ trên đĩa

Mỗi partition là một thư mục gồm nhiều segment file:

/var/lib/kafka/data/orders-0/
├── 00000000000000000000.log       # dữ liệu
├── 00000000000000000000.index     # offset index
├── 00000000000000000000.timeindex # time index
├── 00000000000000123456.log
├── 00000000000000123456.index
└── leader-epoch-checkpoint

Khi segment đạt log.segment.bytes (mặc định 1GB) hoặc hết hạn log.roll.ms, Kafka đóng segment và mở segment mới. Segment cũ sẽ bị xóa theo retention policy.

6.3. Vì sao Kafka nhanh?

7. Cài đặt Kafka bằng Docker — Step by Step

Phần này hướng dẫn dựng Kafka bằng Docker theo 3 cấp độ: (A) single-node KRaft, (B) cluster 3-broker KRaft, (C) full stack với Kafka UI + Schema Registry + Kafka Connect.

7.1. Yêu cầu

Kiểm tra phiên bản
docker --version
docker compose version

7.2. Level A — Kafka single-node KRaft (nhanh nhất)

1Tạo thư mục dự án:
mkdir kafka-demo && cd kafka-demo
2Tạo file docker-compose.yml:
services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      # KRaft settings
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      # Topic defaults
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
    volumes:
      - kafka-data:/var/lib/kafka/data

volumes:
  kafka-data:
3Khởi động:
docker compose up -d
4Kiểm tra log:
docker compose logs -f kafka | head -80
Tìm dòng Kafka Server started là thành công.
5Tạo topic test:
docker exec -it kafka \
  /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic demo --partitions 3 --replication-factor 1
6Gửi/nhận message test:
Terminal Producer
docker exec -it kafka \
  /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic demo

> hello
> kafka
> xin chao
Terminal Consumer
docker exec -it kafka \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic demo --from-beginning

hello
kafka
xin chao
Gỡ bỏ: docker compose down -v (thêm -v để xóa luôn volume).

7.3. Level B — Cluster 3-broker KRaft (giống production)

Tạo file docker-compose.cluster.yml:

x-kafka-common: &kafka-common
  image: apache/kafka:3.7.0
  environment: &kafka-env
    KAFKA_PROCESS_ROLES: broker,controller
    KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
    KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
    KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
    KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
    KAFKA_MIN_INSYNC_REPLICAS: 2
    KAFKA_DEFAULT_REPLICATION_FACTOR: 3
    KAFKA_NUM_PARTITIONS: 3

services:
  kafka1:
    <<: *kafka-common
    container_name: kafka1
    ports: ["9092:9092"]
    environment:
      <<: *kafka-env
      KAFKA_NODE_ID: 1
      KAFKA_LISTENERS: PLAINTEXT://:19092,CONTROLLER://:9093,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092,EXTERNAL://localhost:9092
    volumes: [kafka1-data:/var/lib/kafka/data]

  kafka2:
    <<: *kafka-common
    container_name: kafka2
    ports: ["9094:9094"]
    environment:
      <<: *kafka-env
      KAFKA_NODE_ID: 2
      KAFKA_LISTENERS: PLAINTEXT://:19092,CONTROLLER://:9093,EXTERNAL://:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:19092,EXTERNAL://localhost:9094
    volumes: [kafka2-data:/var/lib/kafka/data]

  kafka3:
    <<: *kafka-common
    container_name: kafka3
    ports: ["9096:9096"]
    environment:
      <<: *kafka-env
      KAFKA_NODE_ID: 3
      KAFKA_LISTENERS: PLAINTEXT://:19092,CONTROLLER://:9093,EXTERNAL://:9096
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:19092,EXTERNAL://localhost:9096
    volumes: [kafka3-data:/var/lib/kafka/data]

volumes:
  kafka1-data:
  kafka2-data:
  kafka3-data:
1Chạy cluster:
docker compose -f docker-compose.cluster.yml up -d
docker compose -f docker-compose.cluster.yml ps
2Tạo topic với RF=3:
docker exec -it kafka1 /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:19092 \
  --create --topic orders --partitions 6 --replication-factor 3
3Kiểm tra phân bố partition:
docker exec -it kafka1 /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:19092 \
  --describe --topic orders
Output sẽ cho thấy Leader, Replicas, ISR cho từng partition.

7.4. Level C — Full stack (Kafka + UI + Schema Registry + Connect)

Tạo docker-compose.full.yml:

services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka
    ports: ["9092:9092"]
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://:19092,CONTROLLER://:9093,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,EXTERNAL://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    container_name: schema-registry
    depends_on: [kafka]
    ports: ["8081:8081"]
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:19092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.6.0
    container_name: kafka-connect
    depends_on: [kafka, schema-registry]
    ports: ["8083:8083"]
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:19092
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on: [kafka]
    ports: ["8080:8080"]
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:19092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: local-connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
      DYNAMIC_CONFIG_ENABLED: 'true'
1Chạy:
docker compose -f docker-compose.full.yml up -d
2Truy cập UI:
  • Kafka UI: http://localhost:8080
  • Schema Registry: http://localhost:8081/subjects
  • Kafka Connect: http://localhost:8083/connectors
Mẹo: Kafka UI (Provectus) là công cụ GUI miễn phí rất tốt để quan sát topic, partition, offset, consumer group và gửi message tay.

8. Sử dụng Kafka CLI

Các script CLI nằm trong /opt/kafka/bin/ (image apache/kafka) hoặc /usr/bin/ (image Confluent). Dưới đây dùng image Apache.

8.1. Quản lý topic

# Liệt kê topic
kafka-topics.sh --bootstrap-server localhost:9092 --list

# Tạo topic
kafka-topics.sh --bootstrap-server localhost:9092 --create \
  --topic payments --partitions 6 --replication-factor 3 \
  --config retention.ms=604800000 \
  --config min.insync.replicas=2

# Xem chi tiết
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic payments

# Tăng số partition (chỉ được tăng, không giảm)
kafka-topics.sh --bootstrap-server localhost:9092 --alter \
  --topic payments --partitions 12

# Xóa topic (cần delete.topic.enable=true)
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic payments

8.2. Producer/Consumer console

# Producer có key
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic payments --property "parse.key=true" --property "key.separator=:"
> user1:{"amount": 100}
> user2:{"amount": 250}

# Consumer đọc từ đầu
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic payments --from-beginning \
  --property print.key=true --property key.separator=" | "

8.3. Consumer group

# Liệt kê group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Xem lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-app

# Reset offset về đầu
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-app --topic payments --reset-offsets --to-earliest --execute

# Reset về thời điểm cụ thể
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-app --topic payments \
  --reset-offsets --to-datetime 2026-04-18T00:00:00.000 --execute

8.4. Performance test

# Producer benchmark
kafka-producer-perf-test.sh --topic perf-test \
  --num-records 1000000 --record-size 1024 \
  --throughput -1 \
  --producer-props bootstrap.servers=localhost:9092 acks=all

# Consumer benchmark
kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 \
  --topic perf-test --messages 1000000

9. Producer & Consumer — Ví dụ code

9.1. Java Producer

// pom.xml
// <dependency>
//   <groupId>org.apache.kafka</groupId>
//   <artifactId>kafka-clients</artifactId>
//   <version>3.7.0</version>
// </dependency>

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("enable.idempotence", true);
        props.put("compression.type", "lz4");
        props.put("linger.ms", 5);
        props.put("batch.size", 16384);

        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 100; i++) {
                String key = "user-" + (i % 10);
                String value = "{\"id\":" + i + ",\"event\":\"click\"}";
                ProducerRecord<String, String> record =
                    new ProducerRecord<>("events", key, value);

                producer.send(record, (meta, ex) -> {
                    if (ex != null) ex.printStackTrace();
                    else System.out.printf("partition=%d offset=%d%n",
                         meta.partition(), meta.offset());
                });
            }
            producer.flush();
        }
    }
}

9.2. Java Consumer

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", false);
        props.put("max.poll.records", 500);

        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(List.of("events"));
            while (true) {
                ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(500));
                for (ConsumerRecord<String, String> r : records) {
                    System.out.printf("key=%s value=%s partition=%d offset=%d%n",
                        r.key(), r.value(), r.partition(), r.offset());
                }
                if (!records.isEmpty()) consumer.commitSync();
            }
        }
    }
}

9.3. Python (kafka-python / confluent-kafka)

Producer (confluent-kafka — khuyến nghị vì nhanh hơn):

# pip install confluent-kafka
from confluent_kafka import Producer
import json, time

conf = {
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'compression.type': 'lz4',
    'linger.ms': 5,
}
p = Producer(conf)

def delivery(err, msg):
    if err: print(f'FAIL: {err}')
    else: print(f'OK partition={msg.partition()} offset={msg.offset()}')

for i in range(100):
    key = f'user-{i % 10}'
    value = json.dumps({'id': i, 'ts': time.time()})
    p.produce('events', key=key, value=value, callback=delivery)
    p.poll(0)
p.flush()

Consumer:

from confluent_kafka import Consumer

c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'py-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
})
c.subscribe(['events'])

try:
    while True:
        msg = c.poll(1.0)
        if msg is None: continue
        if msg.error():
            print(msg.error()); continue
        print(f'{msg.key()} -> {msg.value().decode()}')
        c.commit(msg)
finally:
    c.close()

9.4. Node.js (kafkajs)

// npm i kafkajs
const { Kafka } = require('kafkajs');

const kafka = new Kafka({ clientId: 'demo', brokers: ['localhost:9092'] });

async function produce() {
  const producer = kafka.producer({ idempotent: true });
  await producer.connect();
  await producer.send({
    topic: 'events',
    messages: [
      { key: 'user-1', value: JSON.stringify({ id: 1 }) },
      { key: 'user-2', value: JSON.stringify({ id: 2 }) },
    ],
    acks: -1,
  });
  await producer.disconnect();
}

async function consume() {
  const consumer = kafka.consumer({ groupId: 'node-group' });
  await consumer.connect();
  await consumer.subscribe({ topic: 'events', fromBeginning: true });
  await consumer.run({
    eachMessage: async ({ partition, message }) => {
      console.log(`p=${partition} k=${message.key} v=${message.value.toString()}`);
    },
  });
}

produce().then(consume);

9.5. Go (segmentio/kafka-go)

// go get github.com/segmentio/kafka-go
package main

import (
    "context"
    "log"
    "github.com/segmentio/kafka-go"
)

func main() {
    w := &kafka.Writer{
        Addr:         kafka.TCP("localhost:9092"),
        Topic:        "events",
        Balancer:     &kafka.Hash{},
        RequiredAcks: kafka.RequireAll,
    }
    defer w.Close()

    err := w.WriteMessages(context.Background(),
        kafka.Message{Key: []byte("k1"), Value: []byte("hello")},
    )
    if err != nil { log.Fatal(err) }
}

10. Consumer Group & Rebalance

10.1. Nguyên tắc phân chia partition

Giả sử topic orders có 6 partition:

Số consumer trong groupPhân bố
1C1 nhận cả 6 partition (0-5)
2C1: 0,1,2 | C2: 3,4,5
3C1: 0,1 | C2: 2,3 | C3: 4,5
6Mỗi consumer 1 partition
76 consumer làm việc, 1 idle
Quan trọng: Số consumer trong một group không nên vượt quá số partition, nếu không sẽ có consumer idle.

10.2. Các chiến lược assignor

10.3. Rebalance xảy ra khi nào?

11. Replication, ISR & Leader Election

11.1. Luồng replication

  1. Producer gửi record tới leader của partition.
  2. Leader ghi vào local log.
  3. Các follower liên tục fetch data từ leader (pull-based).
  4. Khi tất cả replica trong ISR đã sao chép record, record được gọi là "committed".
  5. Nếu acks=all, producer chỉ nhận ack sau khi record committed.
  6. Consumer chỉ được đọc record đã committed (high watermark).

11.2. Khi leader chết

Controller (trong KRaft) sẽ bầu một replica trong ISR làm leader mới. Nếu ISR rỗng:

11.3. min.insync.replicas

Kết hợp acks=allmin.insync.replicas=2 (với RF=3) là cấu hình cân bằng: chịu được 1 broker chết mà không mất dữ liệu và không mất availability.

12. Delivery Semantics

SemanticÝ nghĩaCách đạt được
At-most-onceTối đa 1 lần (có thể mất)acks=0 hoặc commit offset trước khi xử lý
At-least-onceÍt nhất 1 lần (có thể lặp)acks=all, retries>0, commit sau khi xử lý
Exactly-onceĐúng 1 lầnIdempotent producer + Transactions

12.1. Idempotent Producer

Bật enable.idempotence=true, Kafka tự gán Producer ID (PID) và sequence number cho mỗi record ⟹ broker loại bỏ duplicate khi producer retry. Kể từ Kafka 3.0, bật mặc định.

12.2. Transactions (Exactly-once)

Dùng cho pattern "consume-transform-produce":

props.put("transactional.id", "my-tx-1");
producer.initTransactions();

try {
    producer.beginTransaction();
    // 1. đọc từ topic input (trong cùng consumer loop)
    // 2. xử lý
    producer.send(new ProducerRecord<>("output", ...));
    // 3. commit offset của input vào transaction
    producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

Consumer phải cấu hình isolation.level=read_committed để chỉ đọc record đã commit.

13. Kafka Streams

Kafka Streams là thư viện Java cho stream processing, chạy embedded trong ứng dụng (không cần cluster riêng như Flink/Spark). Cung cấp DSL khai báo và Processor API cấp thấp.

13.1. Khái niệm KStream vs KTable

13.2. Ví dụ WordCount

Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wc-app");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> text = builder.stream("text-input");

text
  .flatMapValues(v -> Arrays.asList(v.toLowerCase().split("\\W+")))
  .groupBy((k, word) -> word)
  .count(Materialized.as("word-counts"))
  .toStream()
  .to("word-output", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), p);
streams.start();

13.3. Windowing

Kafka Streams hỗ trợ 4 loại cửa sổ: Tumbling, Hopping, Sliding và Session.

stream.groupByKey()
  .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
  .count();

13.4. Stream-Table Join

KStream<String, Order> orders = builder.stream("orders");
KTable<String, User> users = builder.table("users");

orders.join(users,
   (order, user) -> order.withUserInfo(user)
).to("enriched-orders");

14. Kafka Connect

Framework tích hợp dữ liệu vào/ra Kafka mà không cần viết code. Có hai loại connector:

14.1. Ví dụ: CDC từ PostgreSQL bằng Debezium

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-cdc",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "secret",
      "database.dbname": "shop",
      "topic.prefix": "cdc",
      "plugin.name": "pgoutput",
      "table.include.list": "public.orders,public.customers"
    }
  }'

Sau khi tạo, mọi thay đổi trên bảng orderscustomers sẽ được ghi vào topic cdc.public.orders, cdc.public.customers.

14.2. Ví dụ: Sink sang Elasticsearch

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "es-sink",
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max": "1",
      "topics": "enriched-orders",
      "connection.url": "http://elasticsearch:9200",
      "type.name": "_doc",
      "key.ignore": "true",
      "schema.ignore": "true"
    }
  }'

14.3. Quản lý connector

# Liệt kê
curl http://localhost:8083/connectors

# Trạng thái
curl http://localhost:8083/connectors/postgres-cdc/status

# Restart
curl -X POST http://localhost:8083/connectors/postgres-cdc/restart

# Xóa
curl -X DELETE http://localhost:8083/connectors/postgres-cdc

15. Schema Registry & Avro

Vấn đề: khi producer và consumer dùng format khác nhau (JSON tự do), thay đổi schema có thể làm hỏng consumer. Schema Registry lưu trữ và quản lý version schema (Avro/Protobuf/JSON Schema), đảm bảo compatibility.

15.1. Đăng ký schema Avro

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{
    "schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
  }' \
  http://localhost:8081/subjects/orders-value/versions

15.2. Compatibility modes

15.3. Python + Avro

# pip install confluent-kafka[avro]
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_str = """
{
  "type": "record", "name": "Order",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "amount", "type": "double"}
  ]
}
"""
sr = SchemaRegistryClient({'url': 'http://localhost:8081'})
serializer = AvroSerializer(sr, schema_str)

p = SerializingProducer({
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': lambda k, ctx: k.encode(),
    'value.serializer': serializer,
})
p.produce('orders', key='1', value={'id': 1, 'amount': 100.5})
p.flush()

16. Bảo mật

16.1. Các lớp bảo mật

16.2. Ví dụ cấu hình SASL/SCRAM

KAFKA_LISTENERS: SASL_SSL://:9093
KAFKA_ADVERTISED_LISTENERS: SASL_SSL://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_SSL:SASL_SSL
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: changeit
KAFKA_SUPER_USERS: User:admin

16.3. ACL ví dụ

# Cho phép user "alice" đọc từ topic "orders"
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:alice \
  --operation Read --topic orders \
  --group my-group

# Cho phép "bob" ghi
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:bob \
  --operation Write --topic orders

17. Hiệu năng & Tối ưu

17.1. Producer tuning

Tham sốMặc địnhKhuyến nghị production
batch.size16KB64-256KB cho throughput
linger.ms05-100ms
compression.typenonelz4 hoặc zstd
acksallall (an toàn)
enable.idempotencetruetrue
max.in.flight.requests.per.connection5≤5 để giữ thứ tự khi idempotent
buffer.memory32MB64-128MB

17.2. Consumer tuning

fetch.min.bytesDefault 1, tăng lên 1KB-1MB
fetch.max.wait.ms500ms
max.poll.records500-5000 tùy workload
max.poll.interval.msTăng nếu xử lý chậm
session.timeout.ms10-30s

17.3. Broker tuning

17.4. Thiết kế partition

Quy tắc ngón tay cái: Số partition ≥ throughput cần thiết / throughput 1 partition (thường ~10MB/s). Đừng quá 4000 partition/broker và không quá 200k/cluster (với Zookeeper); KRaft có thể nhiều hơn.

18. Giám sát

18.1. Metrics quan trọng

MetricMô tảNgưỡng cảnh báo
UnderReplicatedPartitionsPartition có replica không đủ> 0
OfflinePartitionsCountPartition offline> 0 (critical)
ActiveControllerCountSố controller activePhải = 1
RequestHandlerAvgIdlePercentIO thread idle< 20%
NetworkProcessorAvgIdlePercentNetwork thread idle< 20%
Consumer LagChênh lệch offset producer vs consumerTăng liên tục
BytesInPerSec / BytesOutPerSecThroughputTheo dõi trend

18.2. Stack monitoring phổ biến

18.3. Cấu hình JMX Exporter

environment:
  KAFKA_JMX_PORT: 9999
  KAFKA_JMX_HOSTNAME: kafka
  KAFKA_OPTS: "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"
volumes:
  - ./jmx_exporter:/opt/jmx_exporter
ports:
  - "7071:7071"   # Prometheus scrape

19. Use cases thực tế

19.1. Event-driven Microservices

Các microservice publish event vào Kafka khi trạng thái thay đổi (e.g., OrderCreated, PaymentProcessed). Các service khác subscribe và phản ứng độc lập — decouple cao, resilient.

19.2. Real-time Analytics / Dashboard

Clickstream, IoT telemetry → Kafka → Kafka Streams/Flink → dashboard (Grafana, Superset).

19.3. CDC & Data Lake

Debezium đọc WAL/binlog từ PostgreSQL/MySQL → Kafka → sink sang S3/HDFS/BigQuery.

19.4. Log Aggregation

Fluent Bit/Logstash gửi log vào Kafka → Elasticsearch/Loki/ClickHouse.

19.5. Fraud Detection

Giao dịch được stream real-time vào Kafka, Kafka Streams áp dụng rule/ML model và phát hiện anomaly trong vài ms.

20. Best Practices & Anti-patterns

20.1. Nên làm

20.2. Tránh

20.3. Checklist production

  • ☑ Tối thiểu 3 broker.
  • ☑ RF ≥ 3, min.insync.replicas = 2.
  • ☑ TLS + SASL authentication bật.
  • ☑ ACL được cấu hình theo principle of least privilege.
  • ☑ Monitoring + alerting (Prometheus/Grafana).
  • ☑ Backup metadata (topic config, ACL).
  • ☑ Disk usage alert > 70%.
  • ☑ JVM heap ≤ 8GB, page cache đủ lớn.
  • ☑ Test failover broker.
  • ☑ Tài liệu hóa topic catalog.

21. Tham khảo