Kafka Connect ile Veri Entegrasyonu ve Kullanım Örnekleri

Veri entegrasyonu meselesine gelince, çoğu ekip ya custom script’ler yazıp bunları cron job’larla çalıştırıyor, ya da enterprise ETL araçlarına ciddi lisans bedelleri ödüyor. Kafka Connect ise tam bu ikisi arasındaki boşluğu dolduran, production ortamında gerçekten güvenilir bir çözüm. Birkaç yıldır farklı ölçeklerde Kafka cluster’larını yönetirken öğrendiklerimi burada paylaşmaya çalışacağım.

Kafka Connect Nedir ve Neden Önemlidir?

Kafka Connect, Apache Kafka ekosisteminin bir parçası olarak gelen, veri kaynaklarından Kafka’ya (Source Connector) veya Kafka’dan hedef sistemlere (Sink Connector) veri taşımak için tasarlanmış bir framework. Aslında temelde yaptığı şey basit: veri üretip Kafka topic’lerine yazan ya da bu topic’leri okuyup başka sistemlere yazan worker process’ler çalıştırmak.

Ama işin güzel yanı şu: kendi worker’ınızı sıfırdan yazmak zorunda değilsiniz. Confluent Hub’da yüzlerce hazır connector var. PostgreSQL, MySQL, Elasticsearch, S3, HDFS, MongoDB, Salesforce… Aklınıza gelen neredeyse her sistem için bir connector bulabilirsiniz.

Standalone vs Distributed mod konusunu baştan netleştirelim:

  • Standalone mod: Tek bir process üzerinde çalışır, geliştirme ve test için idealdir. Offset’ler local dosyada saklanır.
  • Distributed mod: Birden fazla worker üzerinde çalışır, yük dağıtımı ve fault tolerance sağlar. Production için bu modu kullanın.

Kurulum ve Temel Konfigürasyon

Kafka’nın zaten kurulu olduğunu varsayıyorum. Kafka Connect, Kafka dağıtımıyla birlikte geliyor, ayrı bir kurulum gerekmez. Ama connector plugin’leri için biraz iş yapmanız gerekecek.

# Kafka dizin yapısını kontrol edelim
ls /opt/kafka/
# bin  config  libs  logs  plugins

# Plugin dizini oluşturun (yoksa)
mkdir -p /opt/kafka/plugins

# Confluent Hub CLI ile connector yükleyin
# Önce Confluent Hub CLI'yi indirin
curl -O https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
tar xzf confluent-hub-client-latest.tar.gz -C /opt/

# JDBC Connector yükleyin
/opt/confluent-hub/bin/confluent-hub install confluentinc/kafka-connect-jdbc:latest 
  --component-dir /opt/kafka/plugins 
  --worker-configs /opt/kafka/config/connect-distributed.properties

Distributed mod için worker konfigürasyonunu düzenleyin:

# /opt/kafka/config/connect-distributed.properties
cat > /opt/kafka/config/connect-distributed.properties << 'EOF'
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5

offset.flush.interval.ms=10000

# Plugin path - önemli!
plugin.path=/opt/kafka/plugins

# REST API portu
rest.port=8083
rest.host.name=0.0.0.0

# Worker heap size için JVM opts ayrıca ayarlanmalı
EOF

Worker’ı başlatmadan önce gerekli topic’lerin oluşturulduğundan emin olun:

# Internal topic'leri oluşturun
kafka-topics.sh --create 
  --bootstrap-server kafka1:9092 
  --topic connect-offsets 
  --partitions 25 
  --replication-factor 3 
  --config cleanup.policy=compact

kafka-topics.sh --create 
  --bootstrap-server kafka1:9092 
  --topic connect-configs 
  --partitions 1 
  --replication-factor 3 
  --config cleanup.policy=compact

kafka-topics.sh --create 
  --bootstrap-server kafka1:9092 
  --topic connect-status 
  --partitions 5 
  --replication-factor 3 
  --config cleanup.policy=compact

# Worker'ı başlat
/opt/kafka/bin/connect-distributed.sh 
  /opt/kafka/config/connect-distributed.properties &

REST API ile Connector Yönetimi

Kafka Connect’in en sevdiğim yanlarından biri REST API’si. Herhangi bir şeyi yeniden başlatmadan connector ekleyip çıkarabilir, durumlarını kontrol edebilirsiniz.

# Mevcut connector'ları listele
curl -s http://connect-host:8083/connectors | python3 -m json.tool

# Cluster bilgisini al
curl -s http://connect-host:8083/ | python3 -m json.tool

# Yüklü plugin'leri listele
curl -s http://connect-host:8083/connector-plugins | python3 -m json.tool

# Connector durumunu kontrol et
curl -s http://connect-host:8083/connectors/my-connector/status | python3 -m json.tool

# Connector'ı yeniden başlat (task seviyesinde)
curl -X POST http://connect-host:8083/connectors/my-connector/tasks/0/restart

# Connector'ı sil
curl -X DELETE http://connect-host:8083/connectors/my-connector

Gerçek Dünya Senaryosu 1: PostgreSQL CDC ile Sipariş Takibi

E-ticaret projelerinde sıkça karşılaştığım senaryo: PostgreSQL’deki orders tablosundaki değişiklikleri gerçek zamanlı olarak başka servislere iletmek. Debezium’un PostgreSQL connector’ı ile Change Data Capture (CDC) yapabiliyorsunuz.

Önce PostgreSQL tarafını hazırlayın:

-- PostgreSQL'de logical replication açın (postgresql.conf)
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4

-- Replication için kullanıcı oluşturun
CREATE USER debezium_user WITH REPLICATION LOGIN PASSWORD 'guclu_sifre';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;

-- Publication oluşturun
CREATE PUBLICATION dbz_publication FOR TABLE orders, order_items;

Şimdi connector’ı tanımlayın:

curl -X POST http://connect-host:8083/connectors 
  -H "Content-Type: application/json" 
  -d '{
    "name": "postgres-orders-cdc",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres-primary.internal",
      "database.port": "5432",
      "database.user": "debezium_user",
      "database.password": "guclu_sifre",
      "database.dbname": "ecommerce",
      "database.server.name": "ecommerce-db",
      "plugin.name": "pgoutput",
      "publication.name": "dbz_publication",
      "table.include.list": "public.orders,public.order_items",
      "topic.prefix": "cdc",
      "decimal.handling.mode": "double",
      "time.precision.mode": "connect",
      "heartbeat.interval.ms": "10000",
      "slot.name": "debezium_orders_slot",
      "snapshot.mode": "initial",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false",
      "transforms.unwrap.delete.handling.mode": "rewrite"
    }
  }'

Bu konfigürasyonla cdc.public.orders ve cdc.public.order_items topic’leri otomatik oluşacak. INSERT, UPDATE, DELETE operasyonlarının hepsi yakalanacak.

Dikkat edilmesi gereken noktar: slot.name benzersiz olmalı. Aynı slot’u iki farklı connector paylaşırsa veriler kaybolur. Bunu production’da bir kez yaşadık, o günden beri naming convention’ımıza slot adını da ekledik.

Gerçek Dünya Senaryosu 2: Elasticsearch Sink ile Log Analizi

Uygulama loglarını Kafka’ya akttıktan sonra Elasticsearch’e göndermek çok yaygın bir use case. Kibana dashboard’ları için mükemmel çalışıyor.

curl -X POST http://connect-host:8083/connectors 
  -H "Content-Type: application/json" 
  -d '{
    "name": "elasticsearch-logs-sink",
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max": "3",
      "topics": "app-logs,nginx-logs,audit-logs",
      "connection.url": "https://elasticsearch.internal:9200",
      "connection.username": "elastic",
      "connection.password": "es_sifre",
      "type.name": "_doc",
      "key.ignore": "true",
      "schema.ignore": "true",
      "behavior.on.malformed.documents": "warn",
      "behavior.on.null.values": "ignore",
      "batch.size": "2000",
      "linger.ms": "1000",
      "flush.timeout.ms": "10000",
      "max.retries": "5",
      "retry.backoff.ms": "100",
      "elastic.security.protocol": "SSL",
      "transforms": "addTimestamp",
      "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.addTimestamp.timestamp.field": "ingest_timestamp"
    }
  }'

batch.size ve linger.ms parametreleri performance açısından kritik. Çok küçük batch boyutu Elasticsearch’i boğar, çok büyük bırakırsanız bellek sorunları çıkabilir. Ben 1000-5000 arasını tercih ediyorum, sisteminizin kapasitesine göre ayarlayın.

SMT (Single Message Transforms) Kullanımı

Connector konfigürasyonunda transforms parametresi çok işe yarıyor. Veriyi consumer tarafında işlemek yerine, Kafka Connect pipeline’ında dönüştürebilirsiniz.

# Örnek: Hassas alanları maskele, topic routing yap
curl -X POST http://connect-host:8083/connectors 
  -H "Content-Type: application/json" 
  -d '{
    "name": "mysql-users-source",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "mysql.internal",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "sifre",
      "database.server.id": "184054",
      "topic.prefix": "mysql",
      "database.include.list": "userdb",
      "table.include.list": "userdb.users",
      "schema.history.internal.kafka.bootstrap.servers": "kafka1:9092",
      "schema.history.internal.kafka.topic": "schema-changes.userdb",
      "transforms": "maskCC,maskPhone,route",
      "transforms.maskCC.type": "org.apache.kafka.connect.transforms.MaskField$Value",
      "transforms.maskCC.fields": "credit_card_number,ssn",
      "transforms.maskCC.replacement": "****",
      "transforms.maskPhone.type": "org.apache.kafka.connect.transforms.MaskField$Value",
      "transforms.maskPhone.fields": "phone_number",
      "transforms.maskPhone.replacement": "MASKED",
      "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.route.regex": "mysql.userdb.(.*)",
      "transforms.route.replacement": "processed-users-$1"
    }
  }'

SMT’lerin zincir halinde uygulandığını unutmayın. transforms listesindeki sıra önemli. Yukarıdaki örnekte önce maskeleme yapılıyor, sonra topic routing.

Monitoring ve Sorun Giderme

Production’da en sık karşılaştığım sorunlar ve çözümleri:

Connector FAILED durumuna düştüğünde:

#!/bin/bash
# connect-health-check.sh
CONNECT_HOST="http://connect-host:8083"

# Tüm connector'ların durumunu kontrol et
connectors=$(curl -s $CONNECT_HOST/connectors)

for connector in $(echo $connectors | python3 -c "import sys,json; print('n'.join(json.load(sys.stdin)))"); do
    status=$(curl -s $CONNECT_HOST/connectors/$connector/status)
    state=$(echo $status | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['connector']['state'])")
    
    if [ "$state" != "RUNNING" ]; then
        echo "UYARI: $connector durumu $state"
        # Task seviyesini de kontrol et
        echo $status | python3 -c "
import sys,json
d=json.load(sys.stdin)
for task in d.get('tasks', []):
    if task['state'] != 'RUNNING':
        print(f'  Task {task["id"]}: {task["state"]} - {task.get("trace", "no trace")}')
"
        # Otomatik restart dene
        echo "Restart deneniyor: $connector"
        curl -X POST $CONNECT_HOST/connectors/$connector/restart
    fi
done

JMX metrikleri için worker başlatma:

# /opt/kafka/bin/connect-distributed-start.sh
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote 
  -Dcom.sun.management.jmxremote.port=9999 
  -Dcom.sun.management.jmxremote.authenticate=false 
  -Dcom.sun.management.jmxremote.ssl=false"

export KAFKA_HEAP_OPTS="-Xms1g -Xmx4g"

/opt/kafka/bin/connect-distributed.sh 
  /opt/kafka/config/connect-distributed.properties

Prometheus ile metrikleri toplamak istiyorsanız JMX Exporter’ı kullanın. kafka.connect:type=connector-task-metrics altındaki metrikler özellikle değerli: source-record-poll-rate, sink-record-read-rate, offset-commit-success-percentage bunların başında geliyor.

S3 Sink ile Veri Arşivleme

Uzun vadeli saklama ve batch analitik için Kafka topic’lerini S3’e dökmek çok yaygın. Maliyet optimizasyonu açısından da mantıklı.

curl -X POST http://connect-host:8083/connectors 
  -H "Content-Type: application/json" 
  -d '{
    "name": "s3-archive-sink",
    "config": {
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "tasks.max": "4",
      "topics": "transactions,events,audit-logs",
      "s3.region": "eu-central-1",
      "s3.bucket.name": "my-data-archive",
      "s3.part.size": "67108864",
      "s3.compression.type": "gzip",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
      "parquet.codec": "snappy",
      "flush.size": "100000",
      "rotate.interval.ms": "3600000",
      "rotate.schedule.interval.ms": "3600000",
      "locale": "tr_TR",
      "timezone": "Europe/Istanbul",
      "timestamp.extractor": "RecordField",
      "timestamp.field": "created_at",
      "path.format": "'"'"'year'"'"'=YYYY/'"'"'month'"'"'=MM/'"'"'day'"'"'=dd/'"'"'hour'"'"'=HH",
      "locale": "en_US",
      "topics.dir": "raw-data",
      "aws.access.key.id": "${file:/opt/kafka/secrets/aws.properties:access.key}",
      "aws.secret.access.key": "${file:/opt/kafka/secrets/aws.properties:secret.key}"
    }
  }'

Parquet format tercihim şuradan geliyor: Athena ile direkt sorgulayabiliyorsunuz, sıkıştırma oranı JSON’a göre çok daha iyi, ve kolon bazlı okuma Spark job’larını ciddi hızlandırıyor.

Credentials yönetimi konusunda: Şifreleri direkt JSON’a yazmak yerine external secrets kullanın. Yukarıdaki ${file:...} syntax’ı tam bunun için. Vault integration da mümkün ama kurulumu daha karmaşık.

Yaygın Hatalar ve Çözümleri

Sahada en çok karşılaştığım sorunları ve çözüm yollarını sıralayayım:

  • Converter uyumsuzluğu: Source connector JSON formatında yazıyor, sink connector Avro bekliyor. key.converter ve value.converter her connector için ayrı ayrı override edilebilir, global ayara güvenmeyin.
  • Offset commit gecikmesi: offset.flush.interval.ms çok yüksek ayarlanmışsa, worker restart durumunda veri tekrar işlenebilir. Kritik sistemlerde 5000ms ve altı tercih edin.
  • Rebalancing fırtınası: Çok sayıda connector ve task çalışırken bir worker düştüğünde, diğer worker’lar üzerindeki yük aniden artar. scheduled.rebalance.max.delay.ms parametresini ayarlayarak geçici outage’larda gereksiz rebalance’ı engelleyebilirsiniz.
  • Dead Letter Queue kurulumu: Hatalı mesajların pipeline’ı durdurmaması için mutlaka DLQ konfigürasyonu yapın:
# Herhangi bir sink connector'a eklenebilir
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-connector-name",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
  • Schema Registry olmadan Avro kullanmaya çalışmak: Bu hata genellikle yeni başlayanların düştüğü bir tuzak. Avro serialization için Confluent Schema Registry zorunlu.
  • Task sayısını gereğinden fazla artırmak: Her task bir thread demek. Veritabanı connector’larında task sayısını artırmak her zaman performansı artırmaz, kaynak tüketimini artırır. Önce mevcut task’ların verimini ölçün.

Sonuç

Kafka Connect, veri entegrasyonu için gerçekten güçlü bir araç. Ancak her araç gibi, doğru kullanım pratiği önemli. Benim tavsiyelerim şöyle özetlenebilir:

Production’a geçmeden önce connector konfigürasyonlarınızı versiyon kontrolüne alın. Her değişikliği elle REST API’ye gönderip unutmak yerine, GitOps yaklaşımıyla yönetin. Kafka Connect için Terraform provider’ları bile mevcut artık.

Monitoring’e baştan yatırım yapın. Connector durumlarını sadece log’lardan takip etmek yetmez. Prometheus + Grafana ile bir dashboard kurun, kritik metrikler için alert tanımlayın. Bir connector’ın saatlerdir data üretmediğini ancak sabah toplantısında öğrenmek istemezsiniz.

Schema yönetimini ciddiye alın. CDC senaryolarında upstream tabloya kolon eklendiğinde ne olacağını önceden düşünün. Debezium bunu genellikle iyi handle eder ama downstream sistemleriniz her zaman o kadar affedici olmayabilir.

Son olarak, Kafka Connect’i her şey için kullanmaya çalışmayın. Basit, düşük hacimli entegrasyonlar için bazen bir Python script gerçekten daha pratik. Kafka Connect’in gerçek değeri, yüksek hacimli, sürekli akan veri senaryolarında, fault tolerance ve at-least-once garantisi gerektiren durumlarında ortaya çıkıyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir