Kafka Streams ile Gerçek Zamanlı Veri İşleme

Üretim ortamında Kafka Streams kullanmaya başladığımda, ilk tepkim “bu kadar mı basit?” olmuştu. Sonra tabii gerçek yük altında sistemin nasıl davrandığını görünce cümlenin ikinci yarısını yazmak zorunda kaldım: “bu kadar mı karmaşık?” İşte bu yazıda o iki uç arasındaki deneyimlerimi ve öğrendiklerimi paylaşacağım.

Kafka Streams Nedir ve Neden Kullanmalıyız?

Kafka Streams, Apache Kafka üzerine inşa edilmiş bir istemci kütüphanesidir. Ayrı bir cluster, ayrı bir servis, ayrı bir deployment gerektirmez. Uygulamanızın içine gömülü çalışır. Bu özelliği onu Flink veya Spark Streaming’den ayıran en kritik noktadır.

Şöyle düşünün: Bir e-ticaret platformunun fraud detection sistemini kuruyorsunuz. Her saniye binlerce sipariş geliyor, bunları gerçek zamanlı olarak analiz etmeniz lazım. Flink cluster’ı ayağa kaldırmak, yönetmek, scale etmek ayrı bir DevOps yükü. Kafka Streams ile bunu doğrudan Spring Boot veya düz Java uygulamanıza gömmek mümkün. Container’ınız başlar, Kafka’ya bağlanır, işlemeye başlar.

Temel kavramlara hızlıca bakalım:

  • KStream: Sonsuz veri akışı. Her kayıt bağımsız bir olay.
  • KTable: Veri akışının anlık görüntüsü, tablo benzeri yapı. Her yeni kayıt aynı key için öncekinin üzerine yazar.
  • GlobalKTable: Tüm partition’lardan veri okuyan KTable. Join işlemlerinde sıkça kullanılır.
  • Topology: Stream işleme pipeline’ının tanımı.
  • State Store: Lokal RocksDB tabanlı durum deposu.

Ortam Kurulumu

Önce yerel geliştirme ortamını kuralım. Docker Compose ile hızlıca bir Kafka ortamı ayağa kaldırıyoruz:

cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_LOG_RETENTION_HOURS: 24

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
EOF

docker-compose up -d

Maven bağımlılıklarını ekleyelim. pom.xml içine şunları ekliyoruz:

cat > pom_dependencies.xml << 'EOF'
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.16.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.9</version>
    </dependency>
</dependencies>
EOF

İlk Stream Uygulaması: Sipariş Filtreleme

Gerçek dünya senaryosuyla başlayalım. E-ticaret platformumuzda “orders” topic’inden gelen siparişleri işleyeceğiz. 500 TL üzeri siparişleri “high-value-orders” topic’ine yönlendireceğiz.

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;

public class OrderFilterApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-filter-v1");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());
        // Tam olarak bir kez semantiği için
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
                  StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> orders = builder.stream("orders");

        // Yüksek değerli siparişleri filtrele
        KStream<String, String> highValueOrders = orders.filter(
            (orderId, orderJson) -> {
                double amount = parseAmount(orderJson);
                return amount > 500.0;
            }
        );

        highValueOrders.to("high-value-orders");

        // Düşük değerli olanları da loglayalım
        orders.filterNot(
            (orderId, orderJson) -> parseAmount(orderJson) > 500.0
        ).to("standard-orders");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        streams.start();
    }

    private static double parseAmount(String orderJson) {
        // Basit JSON parse - production'da ObjectMapper kullanın
        try {
            return Double.parseDouble(
                orderJson.split(""amount":")[1].split("[,}]")[0].trim()
            );
        } catch (Exception e) {
            return 0.0;
        }
    }
}

Bu uygulamayı test etmek için topic’lere veri gönderelim:

# Test verisi üretelim
kafka-console-producer.sh 
  --bootstrap-server localhost:9092 
  --topic orders 
  --property "key.separator=:" 
  --property "parse.key=true" << 'EOF'
order-001:{"orderId":"order-001","amount":750.50,"userId":"user-42"}
order-002:{"orderId":"order-002","amount":120.00,"userId":"user-17"}
order-003:{"orderId":"order-003","amount":1250.00,"userId":"user-99"}
EOF

# Sonuçları izleyelim
kafka-console-consumer.sh 
  --bootstrap-server localhost:9092 
  --topic high-value-orders 
  --from-beginning 
  --property print.key=true

Windowed Aggregation: Gerçek Zamanlı Fraud Detection

Şimdi daha karmaşık bir senaryo: Kullanıcı başına son 5 dakika içindeki toplam harcamayı hesaplayacağız. 2000 TL üzerinde harcama yapan kullanıcıları fraud-alerts topic’ine atacağız.

import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Double> orderAmounts = builder
    .stream("orders", Consumed.with(Serdes.String(), Serdes.String()))
    .mapValues(orderJson -> parseAmount(orderJson))
    .selectKey((orderId, amount) -> extractUserId(orderJson));

// 5 dakikalık tumbling window ile aggregation
TimeWindowedKStream<String, Double> windowedStream = orderAmounts
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)));

KTable<Windowed<String>, Double> userSpending = windowedStream
    .aggregate(
        () -> 0.0,
        (userId, newAmount, totalAmount) -> totalAmount + newAmount,
        Materialized.<String, Double, WindowStore<Bytes, byte[]>>as(
            "user-spending-store"
        ).withValueSerde(Serdes.Double())
    );

// 2000 TL üzeri harcamaları fraud alert olarak işaretle
userSpending.toStream()
    .filter((windowedKey, total) -> total > 2000.0)
    .map((windowedKey, total) -> KeyValue.pair(
        windowedKey.key(),
        String.format(
            "{"userId":"%s","total":%.2f,"windowStart":%d}",
            windowedKey.key(),
            total,
            windowedKey.window().start()
        )
    ))
    .to("fraud-alerts");

Burada dikkat edilmesi gereken nokta: Grace period ayarı. Üretim ortamında ağ gecikmeleri ve consumer lag nedeniyle geç gelen mesajlar olabilir. Grace period olmadan bu mesajlar işlenmez, düşürülür.

// Grace period ile daha güvenilir window tanımı
TimeWindows.ofSizeAndGrace(
    Duration.ofMinutes(5),
    Duration.ofSeconds(30)
)

State Store Kullanımı ve Interactive Queries

Kafka Streams’in güçlü özelliklerinden biri state store’lara doğrudan sorgu atabilmek. Bu sayede stream işleme uygulamanız aynı zamanda basit bir sorgulama API’si sunabilir.

// State store'u Materialized olarak tanımla
KTable<String, Long> orderCount = builder
    .stream("orders", Consumed.with(Serdes.String(), Serdes.String()))
    .selectKey((k, v) -> extractUserId(v))
    .groupByKey()
    .count(Materialized.as("order-count-store"));

// Streams başlatıldıktan sonra store'a sorgu at
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Store hazır olana kadar bekle
streams.cleanUp();

// REST endpoint'ten veya başka bir yerden store'a eriş
ReadOnlyKeyValueStore<String, Long> store = streams
    .store(StoreQueryParameters.fromNameAndType(
        "order-count-store",
        QueryableStoreTypes.keyValueStore()
    ));

// Belirli kullanıcının sipariş sayısını sorgula
Long count = store.get("user-42");
System.out.println("Kullanıcı user-42'nin sipariş sayısı: " + count);

// Tüm kayıtları iterate et
KeyValueIterator<String, Long> all = store.all();
while (all.hasNext()) {
    KeyValue<String, Long> next = all.next();
    System.out.printf("Kullanıcı: %s, Sipariş: %d%n", 
                      next.key, next.value);
}
all.close();

Stream-Table Join: Kullanıcı Profiliyle Zenginleştirme

Üretim ortamında sıkça ihtiyaç duyduğunuz senaryo: Gelen olayları başka bir veri kaynağıyla zenginleştirmek. Siparişlere kullanıcı bilgilerini ekleyelim.

StreamsBuilder builder = new StreamsBuilder();

// Siparişler stream olarak
KStream<String, String> orders = builder.stream("orders");

// Kullanıcı profilleri GlobalKTable olarak (tüm partition'lardan oku)
GlobalKTable<String, String> userProfiles = builder
    .globalTable("user-profiles",
        Materialized.as("user-profiles-store")
    );

// Join işlemi: userId'ye göre eşleştir
KStream<String, String> enrichedOrders = orders.join(
    userProfiles,
    // Sol taraf (order) için key extractor
    (orderId, orderJson) -> extractUserId(orderJson),
    // Join fonksiyonu
    (orderJson, userProfileJson) -> {
        return String.format(
            "{"order":%s,"userProfile":%s}",
            orderJson,
            userProfileJson
        );
    }
);

enrichedOrders.to("enriched-orders");

// Kullanıcı profili olmayan siparişler için branch
KStream<String, String>[] branches = orders.branch(
    (k, v) -> userProfileExists(v),
    (k, v) -> true
);

branches[1].to("orders-without-profile");

GlobalKTable kullanmanın bir maliyeti var: Tüm veriyi her instance’a çekiyor. Kullanıcı profilleriniz çok büyükse standart KTable ve repartition tercih edin.

Üretim Ortamı Konfigürasyonu

Yerel test ile üretim arasındaki en büyük fark konfigürasyon. İşte gerçek hayatta kullandığım ayarlar:

# Kafka Streams üretim konfigürasyonu
cat > streams-prod.properties << 'EOF'
application.id=fraud-detection-prod
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092

# Performans ayarları
num.stream.threads=4
buffered.records.per.partition=1000
cache.max.bytes.buffering=10485760

# Güvenilirlik
processing.guarantee=exactly_once_v2
replication.factor=3
min.insync.replicas=2

# State store için RocksDB dizini
state.dir=/data/kafka-streams

# Consumer ayarları
auto.offset.reset=latest
fetch.min.bytes=50000
fetch.max.wait.ms=500

# Producer ayarları
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
enable.idempotence=true

# Monitoring
metrics.recording.level=INFO
EOF

Thread sayısı için altın kural: Topic partition sayısını geçme. 8 partition’lı bir topic için 8’den fazla thread açmanın anlamı yok, fazla thread’ler boşta bekler.

Monitoring ve Operasyonel Yönetim

Üretimde Kafka Streams uygulamalarını izlemek için JMX metrikleri altın değerinde:

# JMX üzerinden metrik çek
cat > jmx-monitor.sh << 'EOF'
#!/bin/bash

JMX_HOST="localhost"
JMX_PORT="9999"

# Stream thread durumlarını kontrol et
java -jar jmxterm.jar << JMXEOF
open ${JMX_HOST}:${JMX_PORT}

# Process rate metriği
get -b kafka.streams:type=stream-metrics,client-id=fraud-detection-prod 
    process-rate

# Record lag
get -b kafka.streams:type=stream-task-metrics,* 
    records-lag

# State store boyutu
get -b kafka.streams:type=stream-state-metrics,* 
    rocksdb-size-all-mem-tables
JMXEOF
EOF
chmod +x jmx-monitor.sh

Prometheus ile entegrasyon için JMX Exporter kullanıyoruz:

# JMX Exporter konfigürasyonu
cat > jmx-exporter-config.yml << 'EOF'
rules:
  - pattern: "kafka.streams<type=stream-metrics, client-id=(.+)><>(\.+):"
    name: "kafka_streams_$2"
    labels:
      client_id: "$1"
  
  - pattern: "kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>records-lag"
    name: "kafka_streams_task_records_lag"
    labels:
      client_id: "$1"
      task_id: "$2"

  - pattern: "kafka.streams<type=stream-processor-node-metrics.*><>forward-rate"
    name: "kafka_streams_processor_forward_rate"
EOF

# Uygulamayı JMX Exporter ile başlat
java -javaagent:jmx_prometheus_javaagent.jar=8080:jmx-exporter-config.yml 
     -Dcom.sun.management.jmxremote 
     -Dcom.sun.management.jmxremote.port=9999 
     -jar fraud-detection.jar

Hata Yönetimi ve Uncaught Exception Handler

Üretimde karşılaştığım en sık sorun: Stream thread’inin beklenmedik bir exception nedeniyle ölmesi. Varsayılan davranış uygulamayı durdurmak. Bunu değiştirmeliyiz:

streams.setUncaughtExceptionHandler((exception) -> {
    // Hatayı logla ve alerting sistemine gönder
    logger.error("Stream thread'i beklenmedik hata ile karşılaştı", 
                 exception);
    alertingService.sendAlert(
        "KafkaStreams Thread Failure",
        exception.getMessage()
    );
    
    // Hatanın türüne göre karar ver
    if (exception instanceof SerializationException) {
        // Serialization hatası: mesajı atla, devam et
        return StreamThreadExceptionResponse.REPLACE_THREAD;
    } else if (exception instanceof InvalidStateStoreException) {
        // State store hatası: yeniden başlat
        return StreamThreadExceptionResponse.REPLACE_THREAD;
    } else {
        // Bilinmeyen hata: uygulamayı durdur
        return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
    }
});

// Deserializasyon hatalarında mesajı dead-letter topic'e gönder
props.put(
    StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class
);

State Store Yönetimi ve Rebalancing

Kafka Streams uygulamalarında en pahalı operasyon state store’ların rebalancing sırasında yeniden yüklenmesi. Bunu minimize etmek için standby replica kullanın:

props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

Bu ayarla her state store’un bir yedek kopyası başka bir instance’ta tutulur. Rebalancing sırasında yeni aktif instance sıfırdan restore etmek zorunda kalmaz.

State store boyutunu periyodik kontrol edin:

# RocksDB state store boyutlarını kontrol et
du -sh /data/kafka-streams/fraud-detection-prod/*/
# Çıktı örneği:
# 2.3G   /data/kafka-streams/fraud-detection-prod/0_0/
# 2.1G   /data/kafka-streams/fraud-detection-prod/0_1/

State store çok büyüdüğünde RocksDB compaction ayarlarını tuning etmeniz gerekebilir. Bu ayrı bir yazının konusu ama şunu söyleyeyim: varsayılan RocksDB ayarları yüksek yazma trafiğinde yeterli gelmez.

Deployment Stratejileri

Kafka Streams uygulamalarını rolling update ile deploy edebilirsiniz. Aynı application.id ile birden fazla instance çalıştırıldığında Kafka bunları otomatik koordine eder:

# Kubernetes deployment örneği
cat > kafka-streams-deployment.yaml << 'EOF'
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fraud-detection
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 1
      maxSurge: 1
  template:
    spec:
      containers:
      - name: fraud-detection
        image: myregistry/fraud-detection:1.2.0
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka:9092"
        - name: STREAMS_THREADS
          value: "4"
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2"
        volumeMounts:
        - name: state-store
          mountPath: /data/kafka-streams
      volumes:
      - name: state-store
        persistentVolumeClaim:
          claimName: streams-state-pvc
EOF

State store için PersistentVolume kullanmanız kritik önem taşıyor. Pod yeniden başladığında state’i sıfırdan oluşturmak yerine diskten okuyacak, bu başlangıç süresini önemli ölçüde kısaltıyor.

Sonuç

Kafka Streams, doğru anlaşıldığında gerçekten güçlü bir araç. Ayrı bir cluster yönetme derdi olmadan stream processing yapabilmek, özellikle küçük ve orta ölçekli ekipler için büyük avantaj. Ancak şunları aklınızda tutun:

  • State store boyutları ve RocksDB yönetimi göz ardı edilmez. Üretimde mutlaka monitor edin.
  • Exactly-once semantics performans maliyeti getirir. Gerçekten ihtiyacınız var mı, değerlendirin.
  • Rebalancing süreleri büyük state store’larla uzayabilir. Standby replica bu acıyı hafifletir.
  • Windowing semantiğini, özellikle grace period kavramını iyi anlayın. Geç gelen mesajlar sessizce düşer.
  • Thread sayısını partition sayısıyla sınırlı tutun.

Kafka Streams’i ciddiye almak istiyorsanız “Kafka: The Definitive Guide” kitabının streams bölümlerine ve Confluent’in resmi dokümantasyonuna mutlaka başvurun. Burada aktardıklarım üretim deneyimimin özeti, ama her sistemin kendi karakteri var ve keşfetmesi gereken kendi köşeleri.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir