ksqlDB ile Kafka Üzerinde SQL Sorguları Nasıl Çalıştırılır?

Kafka’ya SQL sorguları yazmak kulağa biraz tuhaf gelebilir, ama ksqlDB tam olarak bunu mümkün kılıyor. Event streaming dünyasına SQL’in sadeliğini getiren bu araç, özellikle veri mühendisliği arka planı olmayan sistem yöneticileri için gerçek bir oyun değiştirici. Ben bu konuya ilk baktığımda “neden daha önce yoktu bu?” diye düşünmeden edemedim.

ksqlDB Nedir ve Ne Zaman Kullanılır?

ksqlDB, Confluent tarafından geliştirilen, Kafka üzerinde gerçek zamanlı stream processing yapmanızı sağlayan bir veritabanı sistemidir. Temelde Apache Kafka Streams kütüphanesinin üzerine inşa edilmiştir, ama sizi Java kodu yazmaktan kurtarır. Bunun yerine bildik SQL sözdizimini kullanırsınız.

Tipik kullanım senaryoları şunlardır:

  • Gerçek zamanlı anomali tespiti: Bir e-ticaret sisteminde dakika başına 100’den fazla sipariş veren bir kullanıcıyı anında tespit etmek
  • Stream enrichment: İki farklı topic’i join’leyerek zenginleştirilmiş veri akışı oluşturmak
  • Aggregation: Belirli zaman pencerelerinde metrik toplama işlemleri
  • Filtreleme ve yönlendirme: Belirli koşulları sağlayan event’leri farklı topic’lere yönlendirmek

ksqlDB’yi bir “Kafka üzerindeki materialized view motoru” olarak da düşünebilirsiniz. Sorgularınız sürekli çalışır ve Kafka’ya gelen yeni verilerle kendini günceller.

Kurulum

Docker ile Hızlı Başlangıç

Geliştirme ortamı için en pratik yol Docker Compose kullanmaktır. Aşağıdaki konfigürasyon, tam çalışan bir ortam kurar:

cat > docker-compose.yml << 'EOF'
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.4.0
    depends_on:
      - kafka
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka:29092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.4.0
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
EOF

docker-compose up -d

Servisler ayağa kalktıktan sonra ksqlDB CLI’a bağlanabilirsiniz:

docker exec -it $(docker ps -q -f name=ksqldb-cli) ksql http://ksqldb-server:8088

Production Ortamı için Bağımsız Kurulum

Production’da ksqlDB server’ı ayrı bir node üzerinde çalıştırmak daha mantıklıdır. Confluent Platform paketinden kurulum yapıyorsanız:

# Confluent repository ekleme (RHEL/CentOS için)
sudo rpm --import https://packages.confluent.io/rpm/7.4/archive.key
sudo tee /etc/yum.repos.d/confluent.repo << 'EOF'
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/7.4
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/7.4/archive.key
enabled=1
EOF

sudo yum install -y confluent-ksqldb

# Konfigürasyon
sudo tee /etc/ksqldb/ksql-server.properties << 'EOF'
listeners=http://0.0.0.0:8088
bootstrap.servers=kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
ksql.service.id=production_ksqldb_
ksql.streams.replication.factor=3
ksql.logging.processing.stream.auto.create=true
ksql.logging.processing.topic.auto.create=true
ksql.internal.topic.replicas=3
EOF

sudo systemctl enable confluent-ksqldb
sudo systemctl start confluent-ksqldb

Stream ve Table Kavramları

ksqlDB’de iki temel veri yapısı vardır ve bunları birbirinden ayırt etmek kritik önem taşır.

STREAM: Sonsuz, değişmez event dizisi. Her yeni kayıt bir event’tir ve eski kayıtlar silinmez. Örnek: sipariş işlemleri, log kayıtları, kullanıcı tıklamaları.

TABLE: Bir anahtara göre en güncel durumu tutan yapı. Aynı key geldikçe eski değer güncellenir. Örnek: kullanıcı profilleri, ürün stokları, cihaz durumları.

İlk Stream ve Table Oluşturma

CLI’a bağlandıktan sonra önce mevcut topic’leri görelim:

-- ksqlDB CLI içinde
SHOW TOPICS;
SET 'auto.offset.reset' = 'earliest';

-- Sipariş stream'i oluşturma
CREATE STREAM siparisler (
  siparis_id VARCHAR KEY,
  musteri_id VARCHAR,
  urun_id VARCHAR,
  miktar INT,
  fiyat DOUBLE,
  durum VARCHAR,
  islem_zamani TIMESTAMP
) WITH (
  KAFKA_TOPIC='siparisler-topic',
  VALUE_FORMAT='JSON',
  PARTITIONS=6,
  REPLICAS=3
);

-- Ürün tablosu oluşturma (compacted topic üzerinden)
CREATE TABLE urunler (
  urun_id VARCHAR PRIMARY KEY,
  urun_adi VARCHAR,
  kategori VARCHAR,
  stok INT,
  birim_fiyat DOUBLE
) WITH (
  KAFKA_TOPIC='urunler-topic',
  VALUE_FORMAT='JSON',
  PARTITIONS=6
);

Gerçek Dünya Senaryoları

Senaryo 1: Yüksek Değerli Siparişleri Tespit Etme

Bir e-ticaret sisteminde 1000 TL üzeri siparişleri ayrı bir topic’e yönlendirmek istiyorsunuz. Bu siparişler için anında bildirim gönderilecek:

CREATE STREAM yuksek_degerli_siparisler
  WITH (KAFKA_TOPIC='yuksek-degerli-siparisler', VALUE_FORMAT='JSON')
AS SELECT
  siparis_id,
  musteri_id,
  urun_id,
  miktar,
  fiyat,
  'YUKSEK_DEGER' AS etiket,
  ROWTIME AS event_time
FROM siparisler
WHERE fiyat * miktar > 1000
EMIT CHANGES;

Bu sorgu sürekli çalışır. Yani siparisler topic’ine gelen her yeni mesajı değerlendirir ve koşulu sağlayanları yuksek-degerli-siparisler topic’ine yazar. Herhangi bir uygulama bu topic’i dinleyerek anlık bildirim gönderebilir.

Senaryo 2: Dakikalık Aggregation ile Anlık Dashboard

Monitoring sistemleri için yaygın bir ihtiyaç: Son 5 dakikada kategori bazında toplam satış tutarını hesaplamak.

-- 5 dakikalık tumbling window ile agregasyon
CREATE TABLE kategori_satis_ozeti
  WITH (KAFKA_TOPIC='kategori-satis-ozeti', VALUE_FORMAT='JSON')
AS SELECT
  u.kategori,
  COUNT(*) AS siparis_sayisi,
  SUM(s.fiyat * s.miktar) AS toplam_tutar,
  AVG(s.fiyat * s.miktar) AS ortalama_tutar,
  WINDOWSTART AS pencere_baslangic,
  WINDOWEND AS pencere_bitis
FROM siparisler s
LEFT JOIN urunler u ON s.urun_id = u.urun_id
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY u.kategori
EMIT FINAL;

EMIT FINAL burada önemli. EMIT CHANGES yetersiz sonuçlar dahil sürekli güncelleme gönderir, EMIT FINAL ise pencere kapandığında sadece son durumu gönderir. Dashboard için ikincisi genellikle daha temizdir.

Senaryo 3: Fraud Tespit Sistemi

Bu senaryo benim en çok kullandığım örnek. Bir kullanıcı 10 dakika içinde 5’ten fazla sipariş veriyorsa şüpheli işaret etmek istiyoruz:

-- Şüpheli aktivite tespiti
CREATE TABLE suphelie_aktivite
  WITH (KAFKA_TOPIC='suphelie-aktivite-alarms', VALUE_FORMAT='JSON')
AS SELECT
  musteri_id,
  COUNT(*) AS siparis_sayisi,
  SUM(fiyat * miktar) AS toplam_harcama,
  WINDOWSTART AS pencere_baslangic
FROM siparisler
WINDOW TUMBLING (SIZE 10 MINUTES)
GROUP BY musteri_id
HAVING COUNT(*) > 5
EMIT CHANGES;

Bu query production’da çalıştırdığımda ilk hafta içinde gerçekten bot davranışı sergileyen 3 farklı hesap tespit ettik. Kurumun fraud ekibi çok memnun oldu.

Senaryo 4: Stream-Stream Join ile Event Correlation

Sipariş verildiğinde ve ödeme onaylandığında iki ayrı event geldiğini düşünelim. Bunları birleştirerek tam resmi görmek istiyoruz:

-- Ödeme event stream'i
CREATE STREAM odemeler (
  odeme_id VARCHAR KEY,
  siparis_id VARCHAR,
  odeme_tutari DOUBLE,
  odeme_yontemi VARCHAR,
  odeme_zamani TIMESTAMP
) WITH (
  KAFKA_TOPIC='odemeler-topic',
  VALUE_FORMAT='JSON'
);

-- Stream-stream join: 30 dakika içinde eşleşen sipariş-ödeme ikilisi
CREATE STREAM tamamlanan_siparisler
  WITH (KAFKA_TOPIC='tamamlanan-siparisler', VALUE_FORMAT='JSON')
AS SELECT
  s.siparis_id,
  s.musteri_id,
  s.urun_id,
  s.fiyat * s.miktar AS siparis_tutari,
  o.odeme_tutari,
  o.odeme_yontemi,
  s.islem_zamani AS siparis_zamani,
  o.odeme_zamani
FROM siparisler s
JOIN odemeler o WITHIN 30 MINUTES GRACE PERIOD 5 MINUTES
ON s.siparis_id = o.siparis_id
EMIT CHANGES;

Stream-stream join’lerde WITHIN penceresi kritiktir. Ödeme ağ gecikmelerini hesaba katarak pencereyi biraz geniş tutun, ancak sonsuz büyümemesi için de makul bir üst limit koyun.

Push Query ve Pull Query Ayrımı

ksqlDB’de iki tür sorgu modu vardır ve hangisini kullandığınız performans açısından büyük fark yaratır.

Push Query (EMIT CHANGES ile): Sonuçlar sürekli akar, subscription gibi çalışır. Yeni veri geldikçe client’a gönderilir. Streaming pipeline’lar için kullanın.

Pull Query: Anlık snapshot alır, sonra bağlantıyı kapatır. Geleneksel SQL gibi davranır. REST API üzerinden state store’dan okuma yapar.

# Pull query - REST API üzerinden
curl -X POST http://localhost:8088/query 
  -H "Content-Type: application/vnd.ksql.v1+json" 
  -d '{
    "ksql": "SELECT * FROM kategori_satis_ozeti WHERE kategori = '''Elektronik''';",
    "streamsProperties": {}
  }'

# Push query - sürekli dinleme
curl -X POST http://localhost:8088/query-stream 
  -H "Content-Type: application/vnd.ksql.v1+json" 
  -d '{
    "sql": "SELECT * FROM siparisler EMIT CHANGES;",
    "properties": {"auto.offset.reset": "latest"}
  }'

Connector Entegrasyonu

ksqlDB, Kafka Connect ile entegre çalışır. Bir veritabanından veri çekip doğrudan stream’e bağlayabilirsiniz:

-- PostgreSQL'den veri çeken bir source connector oluşturma
CREATE SOURCE CONNECTOR musteri_connector WITH (
  'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url' = 'jdbc:postgresql://db-server:5432/eticaret',
  'connection.user' = 'kafka_user',
  'connection.password' = 'gizli_sifre',
  'table.whitelist' = 'musteriler',
  'mode' = 'timestamp+incrementing',
  'timestamp.column.name' = 'guncelleme_zamani',
  'incrementing.column.name' = 'id',
  'topic.prefix' = 'pg-',
  'poll.interval.ms' = '5000'
);

-- Connector durumunu kontrol et
SHOW CONNECTORS;
DESCRIBE CONNECTOR musteri_connector;

Performans ve Operasyonel İpuçları

Birkaç ay production’da ksqlDB koşturduktan sonra öğrendiğim bazı kritik noktalar:

State Store boyutunu izleyin: ksqlDB, table’lar ve windowed aggregation’lar için RocksDB tabanlı local state store kullanır. Disk dolunca sürprizle karşılaşabilirsiniz.

# ksqlDB server JVM metriklerini kontrol
curl -s http://localhost:8088/v1/metadata | python3 -m json.tool

# Query durumlarını listele
curl -s http://localhost:8088/ksql 
  -H "Content-Type: application/vnd.ksql.v1+json" 
  -d '{"ksql": "SHOW QUERIES;"}'

Partition sayısına dikkat edin: ksqlDB’deki paralellik Kafka partition sayısıyla doğrudan ilişkilidir. JOIN yapıyorsanız join’lenen topic’lerin aynı partition sayısına sahip olması performansı artırır, aksi halde repartitioning overhead’i oluşur.

Query’leri açıklamadan önce EXPLAIN kullanın:

EXPLAIN
  SELECT musteri_id, COUNT(*) as siparis_sayisi
  FROM siparisler
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY musteri_id
  EMIT CHANGES;

Bu komut execution plan’ı gösterir ve özellikle STATE STORE kullanımını görmek için değerlidir.

Sorunlu query’leri restart etme:

# Tüm query'leri listele
curl -s http://localhost:8088/ksql 
  -H "Content-Type: application/vnd.ksql.v1+json" 
  -d '{"ksql": "SHOW QUERIES EXTENDED;"}' | python3 -m json.tool

# Belirli bir query'yi durdur
curl -X POST http://localhost:8088/ksql 
  -H "Content-Type: application/vnd.ksql.v1+json" 
  -d '{"ksql": "TERMINATE CSAS_YUKSEK_DEGERLI_SIPARISLER_0;"}'

Yaygın Hatalar ve Çözümleri

Production’da sıkça karşılaştığım sorunlar:

Schema uyumsuzluğu: Kafka topic’e farklı formatlarda mesaj gönderiliyorsa stream oluşturma başarısız olur veya query hata verir. Schema Registry kullanmak bu sorunu büyük ölçüde önler.

Late arriving events: Gerçek dünyada event’ler her zaman zamanında gelmez. GRACE PERIOD parametresini unutmayın:

CREATE TABLE saatlik_ozet
  WITH (KAFKA_TOPIC='saatlik-ozet', VALUE_FORMAT='JSON')
AS SELECT
  musteri_id,
  COUNT(*) AS islem_sayisi,
  SUM(fiyat * miktar) AS toplam
FROM siparisler
WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 15 MINUTES)
GROUP BY musteri_id
EMIT FINAL;

Memory baskısı: Büyük windowed join’lerde JVM heap dolabilir. ksqlDB server için heap ayarını kontrol edin:

# ksqlDB heap ayarı (production için en az 8GB önerilen)
export KSQL_HEAP_OPTS="-Xms4g -Xmx8g"

# Ya da systemd service dosyasında
sudo systemctl edit confluent-ksqldb
# [Service] bölümüne ekleyin:
# Environment="KSQL_HEAP_OPTS=-Xms4g -Xmx8g"

Sonuç

ksqlDB, Kafka ekosisteminde önemli bir boşluğu dolduruyor. Daha önce Kafka Streams ile Java kodu yazarak yapılan şeylerin çoğunu artık SQL ile yapabiliyorsunuz. Bu, ekibinizdeki veri analistlerinin ve uygulama geliştiricilerin de stream processing’e dahil olabilmesi anlamına geliyor, sadece streaming uzmanlığı olan mühendislere bağımlı kalmak zorunda değilsiniz.

Ancak bazı gerçeklerin farkında olun: ksqlDB hala olgunlaşmakta olan bir teknoloji. Karmaşık multi-hop join’lerde ve yüksek kardinaliteli aggregation’larda kaynak tüketimi beklenenden fazla olabiliyor. Production’a almadan önce yük testinizi mutlaka yapın ve state store boyutlarını izlemeye alın.

Başlangıç noktası olarak küçük bir use case seçin. Örneğin bir log stream’i üzerinde filtreleme yapın, sonra aşamalı olarak daha karmaşık senaryolara geçin. ksqlDB’nin güzelliği, deneme-yanılma maliyetinin düşük olması: bir sorgu işe yaramıyorsa terminate edip yeniden yazıyorsunuz, tüm bir streaming uygulamasını refactor etmek zorunda kalmıyorsunuz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir