Kafka Streams ile Gerçek Zamanlı Veri İşleme
Üretim ortamında Kafka Streams kullanmaya başladığımda, ilk tepkim “bu kadar mı basit?” olmuştu. Sonra tabii gerçek yük altında sistemin nasıl davrandığını görünce cümlenin ikinci yarısını yazmak zorunda kaldım: “bu kadar mı karmaşık?” İşte bu yazıda o iki uç arasındaki deneyimlerimi ve öğrendiklerimi paylaşacağım.
Kafka Streams Nedir ve Neden Kullanmalıyız?
Kafka Streams, Apache Kafka üzerine inşa edilmiş bir istemci kütüphanesidir. Ayrı bir cluster, ayrı bir servis, ayrı bir deployment gerektirmez. Uygulamanızın içine gömülü çalışır. Bu özelliği onu Flink veya Spark Streaming’den ayıran en kritik noktadır.
Şöyle düşünün: Bir e-ticaret platformunun fraud detection sistemini kuruyorsunuz. Her saniye binlerce sipariş geliyor, bunları gerçek zamanlı olarak analiz etmeniz lazım. Flink cluster’ı ayağa kaldırmak, yönetmek, scale etmek ayrı bir DevOps yükü. Kafka Streams ile bunu doğrudan Spring Boot veya düz Java uygulamanıza gömmek mümkün. Container’ınız başlar, Kafka’ya bağlanır, işlemeye başlar.
Temel kavramlara hızlıca bakalım:
- KStream: Sonsuz veri akışı. Her kayıt bağımsız bir olay.
- KTable: Veri akışının anlık görüntüsü, tablo benzeri yapı. Her yeni kayıt aynı key için öncekinin üzerine yazar.
- GlobalKTable: Tüm partition’lardan veri okuyan KTable. Join işlemlerinde sıkça kullanılır.
- Topology: Stream işleme pipeline’ının tanımı.
- State Store: Lokal RocksDB tabanlı durum deposu.
Ortam Kurulumu
Önce yerel geliştirme ortamını kuralım. Docker Compose ile hızlıca bir Kafka ortamı ayağa kaldırıyoruz:
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_LOG_RETENTION_HOURS: 24
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
EOF
docker-compose up -d
Maven bağımlılıklarını ekleyelim. pom.xml içine şunları ekliyoruz:
cat > pom_dependencies.xml << 'EOF'
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
EOF
İlk Stream Uygulaması: Sipariş Filtreleme
Gerçek dünya senaryosuyla başlayalım. E-ticaret platformumuzda “orders” topic’inden gelen siparişleri işleyeceğiz. 500 TL üzeri siparişleri “high-value-orders” topic’ine yönlendireceğiz.
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;
public class OrderFilterApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-filter-v1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
// Tam olarak bir kez semantiği için
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orders = builder.stream("orders");
// Yüksek değerli siparişleri filtrele
KStream<String, String> highValueOrders = orders.filter(
(orderId, orderJson) -> {
double amount = parseAmount(orderJson);
return amount > 500.0;
}
);
highValueOrders.to("high-value-orders");
// Düşük değerli olanları da loglayalım
orders.filterNot(
(orderId, orderJson) -> parseAmount(orderJson) > 500.0
).to("standard-orders");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
}
private static double parseAmount(String orderJson) {
// Basit JSON parse - production'da ObjectMapper kullanın
try {
return Double.parseDouble(
orderJson.split(""amount":")[1].split("[,}]")[0].trim()
);
} catch (Exception e) {
return 0.0;
}
}
}
Bu uygulamayı test etmek için topic’lere veri gönderelim:
# Test verisi üretelim
kafka-console-producer.sh
--bootstrap-server localhost:9092
--topic orders
--property "key.separator=:"
--property "parse.key=true" << 'EOF'
order-001:{"orderId":"order-001","amount":750.50,"userId":"user-42"}
order-002:{"orderId":"order-002","amount":120.00,"userId":"user-17"}
order-003:{"orderId":"order-003","amount":1250.00,"userId":"user-99"}
EOF
# Sonuçları izleyelim
kafka-console-consumer.sh
--bootstrap-server localhost:9092
--topic high-value-orders
--from-beginning
--property print.key=true
Windowed Aggregation: Gerçek Zamanlı Fraud Detection
Şimdi daha karmaşık bir senaryo: Kullanıcı başına son 5 dakika içindeki toplam harcamayı hesaplayacağız. 2000 TL üzerinde harcama yapan kullanıcıları fraud-alerts topic’ine atacağız.
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Double> orderAmounts = builder
.stream("orders", Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(orderJson -> parseAmount(orderJson))
.selectKey((orderId, amount) -> extractUserId(orderJson));
// 5 dakikalık tumbling window ile aggregation
TimeWindowedKStream<String, Double> windowedStream = orderAmounts
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)));
KTable<Windowed<String>, Double> userSpending = windowedStream
.aggregate(
() -> 0.0,
(userId, newAmount, totalAmount) -> totalAmount + newAmount,
Materialized.<String, Double, WindowStore<Bytes, byte[]>>as(
"user-spending-store"
).withValueSerde(Serdes.Double())
);
// 2000 TL üzeri harcamaları fraud alert olarak işaretle
userSpending.toStream()
.filter((windowedKey, total) -> total > 2000.0)
.map((windowedKey, total) -> KeyValue.pair(
windowedKey.key(),
String.format(
"{"userId":"%s","total":%.2f,"windowStart":%d}",
windowedKey.key(),
total,
windowedKey.window().start()
)
))
.to("fraud-alerts");
Burada dikkat edilmesi gereken nokta: Grace period ayarı. Üretim ortamında ağ gecikmeleri ve consumer lag nedeniyle geç gelen mesajlar olabilir. Grace period olmadan bu mesajlar işlenmez, düşürülür.
// Grace period ile daha güvenilir window tanımı
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5),
Duration.ofSeconds(30)
)
State Store Kullanımı ve Interactive Queries
Kafka Streams’in güçlü özelliklerinden biri state store’lara doğrudan sorgu atabilmek. Bu sayede stream işleme uygulamanız aynı zamanda basit bir sorgulama API’si sunabilir.
// State store'u Materialized olarak tanımla
KTable<String, Long> orderCount = builder
.stream("orders", Consumed.with(Serdes.String(), Serdes.String()))
.selectKey((k, v) -> extractUserId(v))
.groupByKey()
.count(Materialized.as("order-count-store"));
// Streams başlatıldıktan sonra store'a sorgu at
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Store hazır olana kadar bekle
streams.cleanUp();
// REST endpoint'ten veya başka bir yerden store'a eriş
ReadOnlyKeyValueStore<String, Long> store = streams
.store(StoreQueryParameters.fromNameAndType(
"order-count-store",
QueryableStoreTypes.keyValueStore()
));
// Belirli kullanıcının sipariş sayısını sorgula
Long count = store.get("user-42");
System.out.println("Kullanıcı user-42'nin sipariş sayısı: " + count);
// Tüm kayıtları iterate et
KeyValueIterator<String, Long> all = store.all();
while (all.hasNext()) {
KeyValue<String, Long> next = all.next();
System.out.printf("Kullanıcı: %s, Sipariş: %d%n",
next.key, next.value);
}
all.close();
Stream-Table Join: Kullanıcı Profiliyle Zenginleştirme
Üretim ortamında sıkça ihtiyaç duyduğunuz senaryo: Gelen olayları başka bir veri kaynağıyla zenginleştirmek. Siparişlere kullanıcı bilgilerini ekleyelim.
StreamsBuilder builder = new StreamsBuilder();
// Siparişler stream olarak
KStream<String, String> orders = builder.stream("orders");
// Kullanıcı profilleri GlobalKTable olarak (tüm partition'lardan oku)
GlobalKTable<String, String> userProfiles = builder
.globalTable("user-profiles",
Materialized.as("user-profiles-store")
);
// Join işlemi: userId'ye göre eşleştir
KStream<String, String> enrichedOrders = orders.join(
userProfiles,
// Sol taraf (order) için key extractor
(orderId, orderJson) -> extractUserId(orderJson),
// Join fonksiyonu
(orderJson, userProfileJson) -> {
return String.format(
"{"order":%s,"userProfile":%s}",
orderJson,
userProfileJson
);
}
);
enrichedOrders.to("enriched-orders");
// Kullanıcı profili olmayan siparişler için branch
KStream<String, String>[] branches = orders.branch(
(k, v) -> userProfileExists(v),
(k, v) -> true
);
branches[1].to("orders-without-profile");
GlobalKTable kullanmanın bir maliyeti var: Tüm veriyi her instance’a çekiyor. Kullanıcı profilleriniz çok büyükse standart KTable ve repartition tercih edin.
Üretim Ortamı Konfigürasyonu
Yerel test ile üretim arasındaki en büyük fark konfigürasyon. İşte gerçek hayatta kullandığım ayarlar:
# Kafka Streams üretim konfigürasyonu
cat > streams-prod.properties << 'EOF'
application.id=fraud-detection-prod
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
# Performans ayarları
num.stream.threads=4
buffered.records.per.partition=1000
cache.max.bytes.buffering=10485760
# Güvenilirlik
processing.guarantee=exactly_once_v2
replication.factor=3
min.insync.replicas=2
# State store için RocksDB dizini
state.dir=/data/kafka-streams
# Consumer ayarları
auto.offset.reset=latest
fetch.min.bytes=50000
fetch.max.wait.ms=500
# Producer ayarları
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
enable.idempotence=true
# Monitoring
metrics.recording.level=INFO
EOF
Thread sayısı için altın kural: Topic partition sayısını geçme. 8 partition’lı bir topic için 8’den fazla thread açmanın anlamı yok, fazla thread’ler boşta bekler.
Monitoring ve Operasyonel Yönetim
Üretimde Kafka Streams uygulamalarını izlemek için JMX metrikleri altın değerinde:
# JMX üzerinden metrik çek
cat > jmx-monitor.sh << 'EOF'
#!/bin/bash
JMX_HOST="localhost"
JMX_PORT="9999"
# Stream thread durumlarını kontrol et
java -jar jmxterm.jar << JMXEOF
open ${JMX_HOST}:${JMX_PORT}
# Process rate metriği
get -b kafka.streams:type=stream-metrics,client-id=fraud-detection-prod
process-rate
# Record lag
get -b kafka.streams:type=stream-task-metrics,*
records-lag
# State store boyutu
get -b kafka.streams:type=stream-state-metrics,*
rocksdb-size-all-mem-tables
JMXEOF
EOF
chmod +x jmx-monitor.sh
Prometheus ile entegrasyon için JMX Exporter kullanıyoruz:
# JMX Exporter konfigürasyonu
cat > jmx-exporter-config.yml << 'EOF'
rules:
- pattern: "kafka.streams<type=stream-metrics, client-id=(.+)><>(\.+):"
name: "kafka_streams_$2"
labels:
client_id: "$1"
- pattern: "kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>records-lag"
name: "kafka_streams_task_records_lag"
labels:
client_id: "$1"
task_id: "$2"
- pattern: "kafka.streams<type=stream-processor-node-metrics.*><>forward-rate"
name: "kafka_streams_processor_forward_rate"
EOF
# Uygulamayı JMX Exporter ile başlat
java -javaagent:jmx_prometheus_javaagent.jar=8080:jmx-exporter-config.yml
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=9999
-jar fraud-detection.jar
Hata Yönetimi ve Uncaught Exception Handler
Üretimde karşılaştığım en sık sorun: Stream thread’inin beklenmedik bir exception nedeniyle ölmesi. Varsayılan davranış uygulamayı durdurmak. Bunu değiştirmeliyiz:
streams.setUncaughtExceptionHandler((exception) -> {
// Hatayı logla ve alerting sistemine gönder
logger.error("Stream thread'i beklenmedik hata ile karşılaştı",
exception);
alertingService.sendAlert(
"KafkaStreams Thread Failure",
exception.getMessage()
);
// Hatanın türüne göre karar ver
if (exception instanceof SerializationException) {
// Serialization hatası: mesajı atla, devam et
return StreamThreadExceptionResponse.REPLACE_THREAD;
} else if (exception instanceof InvalidStateStoreException) {
// State store hatası: yeniden başlat
return StreamThreadExceptionResponse.REPLACE_THREAD;
} else {
// Bilinmeyen hata: uygulamayı durdur
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
});
// Deserializasyon hatalarında mesajı dead-letter topic'e gönder
props.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class
);
State Store Yönetimi ve Rebalancing
Kafka Streams uygulamalarında en pahalı operasyon state store’ların rebalancing sırasında yeniden yüklenmesi. Bunu minimize etmek için standby replica kullanın:
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
Bu ayarla her state store’un bir yedek kopyası başka bir instance’ta tutulur. Rebalancing sırasında yeni aktif instance sıfırdan restore etmek zorunda kalmaz.
State store boyutunu periyodik kontrol edin:
# RocksDB state store boyutlarını kontrol et
du -sh /data/kafka-streams/fraud-detection-prod/*/
# Çıktı örneği:
# 2.3G /data/kafka-streams/fraud-detection-prod/0_0/
# 2.1G /data/kafka-streams/fraud-detection-prod/0_1/
State store çok büyüdüğünde RocksDB compaction ayarlarını tuning etmeniz gerekebilir. Bu ayrı bir yazının konusu ama şunu söyleyeyim: varsayılan RocksDB ayarları yüksek yazma trafiğinde yeterli gelmez.
Deployment Stratejileri
Kafka Streams uygulamalarını rolling update ile deploy edebilirsiniz. Aynı application.id ile birden fazla instance çalıştırıldığında Kafka bunları otomatik koordine eder:
# Kubernetes deployment örneği
cat > kafka-streams-deployment.yaml << 'EOF'
apiVersion: apps/v1
kind: Deployment
metadata:
name: fraud-detection
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
template:
spec:
containers:
- name: fraud-detection
image: myregistry/fraud-detection:1.2.0
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka:9092"
- name: STREAMS_THREADS
value: "4"
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
volumeMounts:
- name: state-store
mountPath: /data/kafka-streams
volumes:
- name: state-store
persistentVolumeClaim:
claimName: streams-state-pvc
EOF
State store için PersistentVolume kullanmanız kritik önem taşıyor. Pod yeniden başladığında state’i sıfırdan oluşturmak yerine diskten okuyacak, bu başlangıç süresini önemli ölçüde kısaltıyor.
Sonuç
Kafka Streams, doğru anlaşıldığında gerçekten güçlü bir araç. Ayrı bir cluster yönetme derdi olmadan stream processing yapabilmek, özellikle küçük ve orta ölçekli ekipler için büyük avantaj. Ancak şunları aklınızda tutun:
- State store boyutları ve RocksDB yönetimi göz ardı edilmez. Üretimde mutlaka monitor edin.
- Exactly-once semantics performans maliyeti getirir. Gerçekten ihtiyacınız var mı, değerlendirin.
- Rebalancing süreleri büyük state store’larla uzayabilir. Standby replica bu acıyı hafifletir.
- Windowing semantiğini, özellikle grace period kavramını iyi anlayın. Geç gelen mesajlar sessizce düşer.
- Thread sayısını partition sayısıyla sınırlı tutun.
Kafka Streams’i ciddiye almak istiyorsanız “Kafka: The Definitive Guide” kitabının streams bölümlerine ve Confluent’in resmi dokümantasyonuna mutlaka başvurun. Burada aktardıklarım üretim deneyimimin özeti, ama her sistemin kendi karakteri var ve keşfetmesi gereken kendi köşeleri.
