Từ cơ bản đến nâng cao • Kiến trúc, cài đặt Docker & ví dụ thực chiến
v3.7+KRaftDockerTiếng Việt
Apache Kafka là một nền tảng event streaming phân tán mã nguồn mở, được thiết kế để xử lý luồng dữ liệu (stream) khối lượng lớn với thông lượng cao, độ trễ thấp và khả năng mở rộng ngang (horizontal scaling). Kafka hoạt động như một "hệ thần kinh trung ương" (central nervous system) cho dữ liệu, kết nối các nguồn sinh dữ liệu (producers) với các hệ thống tiêu thụ dữ liệu (consumers) theo mô hình publish-subscribe.
| Đặc điểm | Ý nghĩa |
|---|---|
| Thông lượng cao | Xử lý hàng triệu message/giây trên phần cứng thông thường. |
| Bền vững (Durable) | Ghi message xuống đĩa, replicate nhiều bản. |
| Phân tán (Distributed) | Chạy trên cluster nhiều broker, horizontal scale. |
| Lưu trữ dài hạn | Khác với queue truyền thống, Kafka giữ message theo retention policy. |
| Thứ tự (Ordering) | Đảm bảo thứ tự trong từng partition. |
| Replay | Consumer có thể đọc lại lịch sử bằng cách reset offset. |
| Hệ sinh thái rộng | Connect, Streams, ksqlDB, Schema Registry… |
Kafka được tạo ra tại LinkedIn năm 2010 bởi Jay Kreps, Neha Narkhede và Jun Rao để giải quyết bài toán tích hợp dữ liệu giữa hàng trăm hệ thống nội bộ. Năm 2011 được open-source và chuyển sang Apache Software Foundation. Tên "Kafka" được đặt theo nhà văn Franz Kafka vì "đây là một hệ thống tối ưu cho việc ghi chép".
Các cột mốc quan trọng:
Kafka cluster được tạo thành từ nhiều broker. Mỗi topic được chia thành các partition và mỗi partition được replicate ra nhiều broker để đảm bảo độ sẵn sàng. Trong ba bản sao của một partition, một bản được chọn làm leader (phục vụ read/write) và các bản còn lại là follower (replicate data).
Đơn vị dữ liệu nhỏ nhất trong Kafka. Một record gồm:
key — nullable, dùng để xác định partition.value — payload chính (có thể là JSON, Avro, Protobuf…).timestamp — thời điểm sự kiện.headers — metadata (tương tự HTTP headers).offset — vị trí duy nhất trong partition.Topic là "danh mục" chứa các record, tương tự "table" trong database hoặc "folder" trong filesystem. Topic là append-only log — chỉ ghi thêm, không sửa/xóa trực tiếp. Một topic có thể có nhiều producer ghi vào và nhiều consumer đọc ra cùng lúc.
Mỗi topic được chia thành partition để scale ngang. Partition là đơn vị song song hóa và đơn vị đảm bảo thứ tự. Thứ tự chỉ được đảm bảo trong cùng một partition.
Số thứ tự (long, 64-bit) duy nhất trong một partition. Consumer tự lưu offset mình đã đọc đến đâu (thường commit lên topic nội bộ __consumer_offsets).
Một server Kafka chạy process kafka.Kafka. Mỗi broker có một broker.id duy nhất trong cluster. Broker nhận request từ producer/consumer và quản lý partition được gán cho nó.
Client gửi record vào Kafka. Producer quyết định record sẽ đi vào partition nào dựa trên:
key = null: round-robin hoặc sticky partitioner.key: hash(key) % num_partitions ⟹ cùng key luôn vào cùng partition.Partitioner.Consumer đọc record từ topic. Nhiều consumer có thể nhóm lại thành Consumer Group. Trong một group, mỗi partition chỉ được gán cho đúng một consumer — đây là cơ chế cân bằng tải tự động.
Số bản sao của mỗi partition. RF=3 nghĩa là mỗi partition có 3 bản trên 3 broker khác nhau. Production thường dùng RF ≥ 3.
Tập các replica đang "đồng bộ" với leader. Chỉ các replica trong ISR mới đủ điều kiện được bầu làm leader mới khi leader chết.
Chính sách giữ message. Có thể cấu hình theo:
retention.ms — theo thời gian (mặc định 7 ngày).retention.bytes — theo dung lượng.cleanup.policy=compact — log compaction, giữ bản ghi mới nhất theo key.Trước Kafka 2.8, mọi cluster Kafka đều cần Apache Zookeeper để lưu metadata (danh sách broker, topic, partition, ACL…) và tổ chức controller election. Từ phiên bản 3.3+, KRaft (Kafka Raft) đã sẵn sàng production và thay thế hoàn toàn Zookeeper.
| Tiêu chí | Zookeeper mode | KRaft mode |
|---|---|---|
| Thành phần | Kafka + Zookeeper | Chỉ Kafka |
| Cài đặt | Phức tạp hơn | Đơn giản |
| Scale metadata | ~200k partition | Hàng triệu partition |
| Startup time | Chậm | Nhanh hơn 10× |
| Khuyến nghị | Legacy | Khuyến nghị mới |
key và value thành byte.Partitioner xác định partition đích.Sender gom các record cùng partition thành batch (batch.size, linger.ms).ack về producer theo config acks:
acks=0 — fire-and-forget (không chờ ack).acks=1 — chờ leader ghi xong.acks=all (hoặc -1) — chờ tất cả ISR ghi xong (bền vững nhất).Mỗi partition là một thư mục gồm nhiều segment file:
/var/lib/kafka/data/orders-0/
├── 00000000000000000000.log # dữ liệu
├── 00000000000000000000.index # offset index
├── 00000000000000000000.timeindex # time index
├── 00000000000000123456.log
├── 00000000000000123456.index
└── leader-epoch-checkpoint
Khi segment đạt log.segment.bytes (mặc định 1GB) hoặc hết hạn log.roll.ms, Kafka đóng segment và mở segment mới. Segment cũ sẽ bị xóa theo retention policy.
sendfile() syscall — truyền data từ page cache thẳng xuống socket, không qua user-space.Phần này hướng dẫn dựng Kafka bằng Docker theo 3 cấp độ: (A) single-node KRaft, (B) cluster 3-broker KRaft, (C) full stack với Kafka UI + Schema Registry + Kafka Connect.
docker --version
docker compose version
mkdir kafka-demo && cd kafka-demo
docker-compose.yml:
services:
kafka:
image: apache/kafka:3.7.0
container_name: kafka
ports:
- "9092:9092"
environment:
# KRaft settings
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
# Topic defaults
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- kafka-data:/var/lib/kafka/data
volumes:
kafka-data:
docker compose up -d
docker compose logs -f kafka | head -80
Tìm dòng Kafka Server started là thành công.
docker exec -it kafka \
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic demo --partitions 3 --replication-factor 1
docker exec -it kafka \
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic demo
> hello
> kafka
> xin chao
docker exec -it kafka \
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic demo --from-beginning
hello
kafka
xin chao
docker compose down -v (thêm -v để xóa luôn volume).Tạo file docker-compose.cluster.yml:
x-kafka-common: &kafka-common
image: apache/kafka:3.7.0
environment: &kafka-env
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
services:
kafka1:
<<: *kafka-common
container_name: kafka1
ports: ["9092:9092"]
environment:
<<: *kafka-env
KAFKA_NODE_ID: 1
KAFKA_LISTENERS: PLAINTEXT://:19092,CONTROLLER://:9093,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092,EXTERNAL://localhost:9092
volumes: [kafka1-data:/var/lib/kafka/data]
kafka2:
<<: *kafka-common
container_name: kafka2
ports: ["9094:9094"]
environment:
<<: *kafka-env
KAFKA_NODE_ID: 2
KAFKA_LISTENERS: PLAINTEXT://:19092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:19092,EXTERNAL://localhost:9094
volumes: [kafka2-data:/var/lib/kafka/data]
kafka3:
<<: *kafka-common
container_name: kafka3
ports: ["9096:9096"]
environment:
<<: *kafka-env
KAFKA_NODE_ID: 3
KAFKA_LISTENERS: PLAINTEXT://:19092,CONTROLLER://:9093,EXTERNAL://:9096
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:19092,EXTERNAL://localhost:9096
volumes: [kafka3-data:/var/lib/kafka/data]
volumes:
kafka1-data:
kafka2-data:
kafka3-data:
docker compose -f docker-compose.cluster.yml up -d
docker compose -f docker-compose.cluster.yml ps
docker exec -it kafka1 /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:19092 \
--create --topic orders --partitions 6 --replication-factor 3
docker exec -it kafka1 /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:19092 \
--describe --topic orders
Output sẽ cho thấy Leader, Replicas, ISR cho từng partition.
Tạo docker-compose.full.yml:
services:
kafka:
image: apache/kafka:3.7.0
container_name: kafka
ports: ["9092:9092"]
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://:19092,CONTROLLER://:9093,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,EXTERNAL://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
container_name: schema-registry
depends_on: [kafka]
ports: ["8081:8081"]
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:19092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-connect:
image: confluentinc/cp-kafka-connect:7.6.0
container_name: kafka-connect
depends_on: [kafka, schema-registry]
ports: ["8083:8083"]
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:19092
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on: [kafka]
ports: ["8080:8080"]
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:19092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: local-connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
DYNAMIC_CONFIG_ENABLED: 'true'
docker compose -f docker-compose.full.yml up -d
http://localhost:8080http://localhost:8081/subjectshttp://localhost:8083/connectorsCác script CLI nằm trong /opt/kafka/bin/ (image apache/kafka) hoặc /usr/bin/ (image Confluent). Dưới đây dùng image Apache.
# Liệt kê topic
kafka-topics.sh --bootstrap-server localhost:9092 --list
# Tạo topic
kafka-topics.sh --bootstrap-server localhost:9092 --create \
--topic payments --partitions 6 --replication-factor 3 \
--config retention.ms=604800000 \
--config min.insync.replicas=2
# Xem chi tiết
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic payments
# Tăng số partition (chỉ được tăng, không giảm)
kafka-topics.sh --bootstrap-server localhost:9092 --alter \
--topic payments --partitions 12
# Xóa topic (cần delete.topic.enable=true)
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic payments
# Producer có key
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic payments --property "parse.key=true" --property "key.separator=:"
> user1:{"amount": 100}
> user2:{"amount": 250}
# Consumer đọc từ đầu
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic payments --from-beginning \
--property print.key=true --property key.separator=" | "
# Liệt kê group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Xem lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-app
# Reset offset về đầu
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-app --topic payments --reset-offsets --to-earliest --execute
# Reset về thời điểm cụ thể
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-app --topic payments \
--reset-offsets --to-datetime 2026-04-18T00:00:00.000 --execute
# Producer benchmark
kafka-producer-perf-test.sh --topic perf-test \
--num-records 1000000 --record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092 acks=all
# Consumer benchmark
kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 \
--topic perf-test --messages 1000000
// pom.xml
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-clients</artifactId>
// <version>3.7.0</version>
// </dependency>
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("enable.idempotence", true);
props.put("compression.type", "lz4");
props.put("linger.ms", 5);
props.put("batch.size", 16384);
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 100; i++) {
String key = "user-" + (i % 10);
String value = "{\"id\":" + i + ",\"event\":\"click\"}";
ProducerRecord<String, String> record =
new ProducerRecord<>("events", key, value);
producer.send(record, (meta, ex) -> {
if (ex != null) ex.printStackTrace();
else System.out.printf("partition=%d offset=%d%n",
meta.partition(), meta.offset());
});
}
producer.flush();
}
}
}
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", false);
props.put("max.poll.records", 500);
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("events"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> r : records) {
System.out.printf("key=%s value=%s partition=%d offset=%d%n",
r.key(), r.value(), r.partition(), r.offset());
}
if (!records.isEmpty()) consumer.commitSync();
}
}
}
}
Producer (confluent-kafka — khuyến nghị vì nhanh hơn):
# pip install confluent-kafka
from confluent_kafka import Producer
import json, time
conf = {
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'enable.idempotence': True,
'compression.type': 'lz4',
'linger.ms': 5,
}
p = Producer(conf)
def delivery(err, msg):
if err: print(f'FAIL: {err}')
else: print(f'OK partition={msg.partition()} offset={msg.offset()}')
for i in range(100):
key = f'user-{i % 10}'
value = json.dumps({'id': i, 'ts': time.time()})
p.produce('events', key=key, value=value, callback=delivery)
p.poll(0)
p.flush()
Consumer:
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'py-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
})
c.subscribe(['events'])
try:
while True:
msg = c.poll(1.0)
if msg is None: continue
if msg.error():
print(msg.error()); continue
print(f'{msg.key()} -> {msg.value().decode()}')
c.commit(msg)
finally:
c.close()
// npm i kafkajs
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ clientId: 'demo', brokers: ['localhost:9092'] });
async function produce() {
const producer = kafka.producer({ idempotent: true });
await producer.connect();
await producer.send({
topic: 'events',
messages: [
{ key: 'user-1', value: JSON.stringify({ id: 1 }) },
{ key: 'user-2', value: JSON.stringify({ id: 2 }) },
],
acks: -1,
});
await producer.disconnect();
}
async function consume() {
const consumer = kafka.consumer({ groupId: 'node-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'events', fromBeginning: true });
await consumer.run({
eachMessage: async ({ partition, message }) => {
console.log(`p=${partition} k=${message.key} v=${message.value.toString()}`);
},
});
}
produce().then(consume);
// go get github.com/segmentio/kafka-go
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "events",
Balancer: &kafka.Hash{},
RequiredAcks: kafka.RequireAll,
}
defer w.Close()
err := w.WriteMessages(context.Background(),
kafka.Message{Key: []byte("k1"), Value: []byte("hello")},
)
if err != nil { log.Fatal(err) }
}
Giả sử topic orders có 6 partition:
| Số consumer trong group | Phân bố |
|---|---|
| 1 | C1 nhận cả 6 partition (0-5) |
| 2 | C1: 0,1,2 | C2: 3,4,5 |
| 3 | C1: 0,1 | C2: 2,3 | C3: 4,5 |
| 6 | Mỗi consumer 1 partition |
| 7 | 6 consumer làm việc, 1 idle |
RangeAssignor — mặc định cũ, gán theo range (có thể không đều).RoundRobinAssignor — chia đều theo round-robin.StickyAssignor — giữ assignment cũ khi rebalance.CooperativeStickyAssignor — khuyến nghị hiện nay, rebalance không cần "stop-the-world".session.timeout.ms).acks=all, producer chỉ nhận ack sau khi record committed.Controller (trong KRaft) sẽ bầu một replica trong ISR làm leader mới. Nếu ISR rỗng:
unclean.leader.election.enable=false (mặc định, an toàn) — partition offline đến khi có replica ISR hồi phục.unclean.leader.election.enable=true — cho phép bầu replica ngoài ISR ⟹ có thể mất dữ liệu nhưng giữ availability.Kết hợp acks=all và min.insync.replicas=2 (với RF=3) là cấu hình cân bằng: chịu được 1 broker chết mà không mất dữ liệu và không mất availability.
| Semantic | Ý nghĩa | Cách đạt được |
|---|---|---|
| At-most-once | Tối đa 1 lần (có thể mất) | acks=0 hoặc commit offset trước khi xử lý |
| At-least-once | Ít nhất 1 lần (có thể lặp) | acks=all, retries>0, commit sau khi xử lý |
| Exactly-once | Đúng 1 lần | Idempotent producer + Transactions |
Bật enable.idempotence=true, Kafka tự gán Producer ID (PID) và sequence number cho mỗi record ⟹ broker loại bỏ duplicate khi producer retry. Kể từ Kafka 3.0, bật mặc định.
Dùng cho pattern "consume-transform-produce":
props.put("transactional.id", "my-tx-1");
producer.initTransactions();
try {
producer.beginTransaction();
// 1. đọc từ topic input (trong cùng consumer loop)
// 2. xử lý
producer.send(new ProducerRecord<>("output", ...));
// 3. commit offset của input vào transaction
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Consumer phải cấu hình isolation.level=read_committed để chỉ đọc record đã commit.
Kafka Streams là thư viện Java cho stream processing, chạy embedded trong ứng dụng (không cần cluster riêng như Flink/Spark). Cung cấp DSL khai báo và Processor API cấp thấp.
Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wc-app");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> text = builder.stream("text-input");
text
.flatMapValues(v -> Arrays.asList(v.toLowerCase().split("\\W+")))
.groupBy((k, word) -> word)
.count(Materialized.as("word-counts"))
.toStream()
.to("word-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), p);
streams.start();
Kafka Streams hỗ trợ 4 loại cửa sổ: Tumbling, Hopping, Sliding và Session.
stream.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
KStream<String, Order> orders = builder.stream("orders");
KTable<String, User> users = builder.table("users");
orders.join(users,
(order, user) -> order.withUserInfo(user)
).to("enriched-orders");
Framework tích hợp dữ liệu vào/ra Kafka mà không cần viết code. Có hai loại connector:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "secret",
"database.dbname": "shop",
"topic.prefix": "cdc",
"plugin.name": "pgoutput",
"table.include.list": "public.orders,public.customers"
}
}'
Sau khi tạo, mọi thay đổi trên bảng orders và customers sẽ được ghi vào topic cdc.public.orders, cdc.public.customers.
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "enriched-orders",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true"
}
}'
# Liệt kê
curl http://localhost:8083/connectors
# Trạng thái
curl http://localhost:8083/connectors/postgres-cdc/status
# Restart
curl -X POST http://localhost:8083/connectors/postgres-cdc/restart
# Xóa
curl -X DELETE http://localhost:8083/connectors/postgres-cdc
Vấn đề: khi producer và consumer dùng format khác nhau (JSON tự do), thay đổi schema có thể làm hỏng consumer. Schema Registry lưu trữ và quản lý version schema (Avro/Protobuf/JSON Schema), đảm bảo compatibility.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
}' \
http://localhost:8081/subjects/orders-value/versions
BACKWARD — consumer mới đọc được data cũ (mặc định).FORWARD — consumer cũ đọc được data mới.FULL — cả hai chiều.NONE — tắt kiểm tra.# pip install confluent-kafka[avro]
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_str = """
{
"type": "record", "name": "Order",
"fields": [
{"name": "id", "type": "int"},
{"name": "amount", "type": "double"}
]
}
"""
sr = SchemaRegistryClient({'url': 'http://localhost:8081'})
serializer = AvroSerializer(sr, schema_str)
p = SerializingProducer({
'bootstrap.servers': 'localhost:9092',
'key.serializer': lambda k, ctx: k.encode(),
'value.serializer': serializer,
})
p.produce('orders', key='1', value={'id': 1, 'amount': 100.5})
p.flush()
KAFKA_LISTENERS: SASL_SSL://:9093
KAFKA_ADVERTISED_LISTENERS: SASL_SSL://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_SSL:SASL_SSL
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: changeit
KAFKA_SUPER_USERS: User:admin
# Cho phép user "alice" đọc từ topic "orders"
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:alice \
--operation Read --topic orders \
--group my-group
# Cho phép "bob" ghi
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:bob \
--operation Write --topic orders
| Tham số | Mặc định | Khuyến nghị production |
|---|---|---|
batch.size | 16KB | 64-256KB cho throughput |
linger.ms | 0 | 5-100ms |
compression.type | none | lz4 hoặc zstd |
acks | all | all (an toàn) |
enable.idempotence | true | true |
max.in.flight.requests.per.connection | 5 | ≤5 để giữ thứ tự khi idempotent |
buffer.memory | 32MB | 64-128MB |
fetch.min.bytes | Default 1, tăng lên 1KB-1MB |
fetch.max.wait.ms | 500ms |
max.poll.records | 500-5000 tùy workload |
max.poll.interval.ms | Tăng nếu xử lý chậm |
session.timeout.ms | 10-30s |
num.network.threads = số CPU core.num.io.threads = 2-8× số disk.socket.send.buffer.bytes, socket.receive.buffer.bytes = 1MB.log.flush.interval.messages=Long.MAX, để OS flush (fsync).Quy tắc ngón tay cái: Số partition ≥ throughput cần thiết / throughput 1 partition (thường ~10MB/s). Đừng quá 4000 partition/broker và không quá 200k/cluster (với Zookeeper); KRaft có thể nhiều hơn.
| Metric | Mô tả | Ngưỡng cảnh báo |
|---|---|---|
| UnderReplicatedPartitions | Partition có replica không đủ | > 0 |
| OfflinePartitionsCount | Partition offline | > 0 (critical) |
| ActiveControllerCount | Số controller active | Phải = 1 |
| RequestHandlerAvgIdlePercent | IO thread idle | < 20% |
| NetworkProcessorAvgIdlePercent | Network thread idle | < 20% |
| Consumer Lag | Chênh lệch offset producer vs consumer | Tăng liên tục |
| BytesInPerSec / BytesOutPerSec | Throughput | Theo dõi trend |
environment:
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: kafka
KAFKA_OPTS: "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"
volumes:
- ./jmx_exporter:/opt/jmx_exporter
ports:
- "7071:7071" # Prometheus scrape
Các microservice publish event vào Kafka khi trạng thái thay đổi (e.g., OrderCreated, PaymentProcessed). Các service khác subscribe và phản ứng độc lập — decouple cao, resilient.
Clickstream, IoT telemetry → Kafka → Kafka Streams/Flink → dashboard (Grafana, Superset).
Debezium đọc WAL/binlog từ PostgreSQL/MySQL → Kafka → sink sang S3/HDFS/BigQuery.
Fluent Bit/Logstash gửi log vào Kafka → Elasticsearch/Loki/ClickHouse.
Giao dịch được stream real-time vào Kafka, Kafka Streams áp dụng rule/ML model và phát hiện anomaly trong vài ms.
domain.entity.action (ví dụ: sales.order.created).acks=all + min.insync.replicas=2 + RF=3 cho data quan trọng.auto.create.topics.enable=true trên production.unclean.leader.election trên data quan trọng.