Kafka MirrorMaker ile Cluster Replikasyonu: Adım Adım Rehber

Üretim ortamında Kafka cluster’ları arasında veri replikasyonu yapmak zorunda kaldığınızda, MirrorMaker kaçınılmaz bir isim olarak karşınıza çıkar. Yıllar önce ilk MirrorMaker kurulumumu yaparken tam anlamıyla bir “trial and error” sürecinden geçtim. Dokümantasyon yetersiz, community forum cevapları çelişkili, ve production ortamında bir şeyler ters gidince alarm zilleri çalmaya başlıyordu. Bu yazıda o deneyimlerden damıttığım bilgileri, gerçek senaryolarla birlikte aktarmaya çalışacağım.

MirrorMaker Nedir ve Ne Zaman Kullanılır?

Apache Kafka’nın kendi ekosistemi içinde gelen MirrorMaker, aslında özünde bir consumer ve producer kombinasyonudur. Kaynak cluster’dan consume ettiği mesajları, hedef cluster’a produce eder. Kulağa basit geliyor, değil mi? Ama bu basit tanım arkasında ciddi konfigürasyon detayları ve tuzaklar yatıyor.

MirrorMaker’ı şu senaryolarda kullanırsınız:

  • Disaster Recovery (DR) kurulumu: İstanbul’daki primary cluster’ınızın Ankara’daki DR cluster’ına sürekli replike edilmesi
  • Geo-distribution: Farklı bölgelerdeki ekiplerin local latency ile Kafka’ya erişmesi
  • Cloud Migration: On-premise’den cloud’a geçiş sürecinde paralel çalışma
  • Cluster Yükseltme: Sıfır downtime ile Kafka versiyonu yükseltme
  • Data Aggregation: Birden fazla regional cluster’ın merkezi bir cluster’da toplanması

MirrorMaker 1 ile MirrorMaker 2 arasındaki fark çok önemli. Kafka 2.4 ile birlikte gelen MirrorMaker 2 (MM2), Kafka Connect framework’ü üzerine inşa edilmiş ve MM1’in ciddi eksikliklerini gideriyor. Offset senkronizasyonu, topic ACL replikasyonu, ve dinamik konfigürasyon bunların başında geliyor. Bu yazıda ağırlıklı olarak MM2 üzerinde duracağım, ancak MM1 konfigürasyonuna da değineceğim çünkü legacy ortamlarda hala karşınıza çıkıyor.

Ortam Hazırlığı ve Ön Gereksinimler

Önce elimizdeki ortamı netleştirelim. Bu yazı boyunca iki Kafka cluster’ımız olduğunu varsayacağız:

  • Source cluster: kafka-source:9092 (Primary, İstanbul DC)
  • Target cluster: kafka-target:9092 (DR, Ankara DC)

Her iki tarafta da Kafka kurulu ve çalışıyor olmalı. MirrorMaker’ı source cluster’ın yanına, hedef cluster’ın yanına, ya da tamamen bağımsız bir makineye kurabilirsiniz. Ben genellikle bağımsız bir makineye kurmayı tercih ediyorum çünkü kaynak cluster’a ekstra yük bindirmeden izole bir şekilde yönetmek daha temiz oluyor.

Network tarafında dikkat etmeniz gereken birkaç nokta var:

  • MirrorMaker’ın kurulu olduğu makine, her iki cluster’ın broker’larına TCP olarak erişebilmeli
  • Güvenlik duvarı kurallarında Kafka port’larına (genellikle 9092, SSL için 9093) izin verilmeli
  • Yüksek throughput senaryolarında network bandwidth hesabı mutlaka yapılmalı

MirrorMaker 2 Konfigürasyonu

MM2, connect-mirror-maker.properties dosyası üzerinden konfigüre edilir. Temel bir kurulum için şu yapıyı kullanabilirsiniz:

# /opt/kafka/config/mirror-maker.properties

# Cluster alias'larını tanımla
clusters = source, target

# Source cluster bağlantı bilgileri
source.bootstrap.servers = kafka-source-1:9092,kafka-source-2:9092,kafka-source-3:9092

# Target cluster bağlantı bilgileri
target.bootstrap.servers = kafka-target-1:9092,kafka-target-2:9092,kafka-target-3:9092

# Hangi topic'leri replike edeceğiz
source->target.topics = .*
source->target.groups = .*

# Topic replikasyon faktörü (hedef cluster'daki)
replication.factor = 3

# Offset senkronizasyonu
source->target.emit.heartbeats.enabled = true
source->target.emit.checkpoints.enabled = true
source->target.sync.group.offsets.enabled = true

# Heartbeat interval (ms)
source->target.heartbeats.topic.replication.factor = 1
source->target.checkpoints.topic.replication.factor = 1

Bu konfigürasyonu kaydettikten sonra MM2’yi şöyle başlatabilirsiniz:

# MirrorMaker 2'yi başlat
/opt/kafka/bin/connect-mirror-maker.sh /opt/kafka/config/mirror-maker.properties

# Arka planda çalıştırmak için
nohup /opt/kafka/bin/connect-mirror-maker.sh /opt/kafka/config/mirror-maker.properties 
  > /var/log/kafka/mirrormaker.log 2>&1 &

Topic Filtreleme

Her şeyi replike etmek çoğu zaman mantıklı değil. Bazı topic’ler test amaçlı, bazıları geçici, bazıları ise DR kapsamında olmayabilir. Filtreleme için regex kullanıyoruz:

# Sadece belirli topic'leri replike et
source->target.topics = payment..*|order..*|inventory..*

# Belirli topic'leri hariç tut
source->target.topics.exclude = .*test.*|.*staging.*|__.*

# Consumer group filtreleme
source->target.groups = production-.*
source->target.groups.exclude = .*-test|.*-dev

Burada dikkat etmeniz gereken nokta: MM2 varsayılan olarak replike ettiği topic’lerin başına kaynak cluster alias’ını ekler. Yani source cluster’daki orders topic’i, target cluster’da source.orders olarak görünür. Bu davranışı değiştirmek mümkün ama dikkatli olmak gerekiyor.

Distributed Mod ile Yüksek Erişilebilirlik

Standalone mod, geliştirme ve test için uygundur. Production’da mutlaka distributed mod kullanın. Distributed modda MM2, Kafka Connect worker’ları üzerinde çalışır ve failover mekanizması built-in olarak gelir.

# /opt/kafka/config/connect-distributed.properties

bootstrap.servers = kafka-source-1:9092,kafka-source-2:9092

group.id = mirrormaker-connect-cluster

key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

offset.storage.topic = mirrormaker-offsets
offset.storage.replication.factor = 3
offset.storage.partitions = 25

config.storage.topic = mirrormaker-configs
config.storage.replication.factor = 3

status.storage.topic = mirrormaker-status
status.storage.replication.factor = 3
status.storage.partitions = 5

# REST API için
rest.port = 8083
rest.host.name = 0.0.0.0

Worker’ları başlatmak için:

# Her worker node'unda çalıştır
/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties &

# Connector'ı REST API üzerinden yükle
curl -X POST http://mirrormaker-worker-1:8083/connectors 
  -H "Content-Type: application/json" 
  -d '{
    "name": "source-mirror-source-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "source.cluster.alias": "source",
      "target.cluster.alias": "target",
      "source.cluster.bootstrap.servers": "kafka-source-1:9092,kafka-source-2:9092",
      "target.cluster.bootstrap.servers": "kafka-target-1:9092,kafka-target-2:9092",
      "topics": "payment.*,order.*",
      "replication.factor": "3",
      "tasks.max": "4"
    }
  }'

SSL ve Güvenlik Konfigürasyonu

Production ortamında plaintext kullanmayın. Her iki cluster için de SSL konfigürasyonu yapmanız gerekiyor:

# /opt/kafka/config/mirror-maker.properties'e ekle

# Source cluster SSL
source.security.protocol = SSL
source.ssl.truststore.location = /opt/kafka/ssl/source-truststore.jks
source.ssl.truststore.password = changeit
source.ssl.keystore.location = /opt/kafka/ssl/source-keystore.jks
source.ssl.keystore.password = changeit
source.ssl.key.password = changeit

# Target cluster SSL
target.security.protocol = SSL
target.ssl.truststore.location = /opt/kafka/ssl/target-truststore.jks
target.ssl.truststore.password = changeit
target.ssl.keystore.location = /opt/kafka/ssl/target-keystore.jks
target.ssl.keystore.password = changeit
target.ssl.key.password = changeit

SASL/PLAIN kullanan ortamlar için:

# Source cluster SASL konfigürasyonu
source.security.protocol = SASL_SSL
source.sasl.mechanism = PLAIN
source.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required 
  username="mirrormaker-user" 
  password="guclu-sifre-buraya";

# Target cluster SASL konfigürasyonu
target.security.protocol = SASL_SSL
target.sasl.mechanism = PLAIN
target.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required 
  username="mirrormaker-user" 
  password="guclu-sifre-buraya";

Performans Tuning

Varsayılan ayarlarla MirrorMaker çalışır ama production yükünü kaldırmak için tuning şart. Burada birkaç kritik parametre var:

# /opt/kafka/config/mirror-maker.properties

# Consumer tarafı optimizasyonu
source.consumer.fetch.min.bytes = 1048576
source.consumer.fetch.max.wait.ms = 500
source.consumer.max.partition.fetch.bytes = 10485760

# Producer tarafı optimizasyonu
target.producer.batch.size = 524288
target.producer.linger.ms = 10
target.producer.compression.type = lz4
target.producer.buffer.memory = 67108864
target.producer.max.request.size = 10485760

# Acks ayarı - DR senaryosunda all kullanın
target.producer.acks = all

# Task sayısı - partition sayısına göre ayarlayın
tasks.max = 8

tasks.max değeri çok önemli. En fazla replike edeceğiniz topic’in partition sayısını geçemez. Geçerseniz boşta worker’lar oluşur ve kaynak israf edersiniz. Ben genellikle (toplam partition sayısı / worker sayısı) formülünü kullanıyorum.

Monitoring ve Alerting

MirrorMaker’ı kurup çalıştırmak yetmez, ne kadar sağlıklı çalıştığını da izlemeniz gerekir. JMX metrikleri bu konuda ana kaynağınız olacak.

# JMX'i etkinleştirmek için başlatma scriptini düzenle
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote 
  -Dcom.sun.management.jmxremote.authenticate=false 
  -Dcom.sun.management.jmxremote.ssl=false 
  -Dcom.sun.management.jmxremote.port=9999"

/opt/kafka/bin/connect-mirror-maker.sh /opt/kafka/config/mirror-maker.properties

İzlemeniz gereken kritik metrikler:

  • kafka.connect:type=mirror-source-metrics,target=target: Replikasyon lag değerleri
  • kafka.consumer:type=consumer-fetch-manager-metrics: Fetch latency
  • kafka.producer:type=producer-metrics: Produce rate ve hata oranları

Prometheus ile entegrasyon için JMX Exporter kullanabilirsiniz:

# jmx_exporter.yaml
rules:
  - pattern: 'kafka.connect<type=mirror-source-metrics, target=(.+)><>replication-latency-ms-avg'
    name: kafka_mirrormaker_replication_latency_ms
    labels:
      target: "$1"
  - pattern: 'kafka.connect<type=mirror-source-metrics, target=(.+)><>record-count'
    name: kafka_mirrormaker_record_count
    labels:
      target: "$1"

Lag monitörlemesi için basit bir script:

#!/bin/bash
# mirrormaker-lag-check.sh

SOURCE_BROKERS="kafka-source-1:9092,kafka-source-2:9092"
TARGET_BROKERS="kafka-target-1:9092,kafka-target-2:9092"
THRESHOLD=10000

# Source'daki offset'leri al
SOURCE_LAG=$(/opt/kafka/bin/kafka-consumer-groups.sh 
  --bootstrap-server $SOURCE_BROKERS 
  --group mirrormaker-consumer-group 
  --describe 2>/dev/null | awk 'NR>1 {sum += $6} END {print sum}')

if [ "$SOURCE_LAG" -gt "$THRESHOLD" ]; then
  echo "ALARM: MirrorMaker lag $SOURCE_LAG threshold'u ($THRESHOLD) asti!"
  # Alarm mekanizmanıza göre burayı özelleştirin
  # Slack, PagerDuty, email vb.
fi

echo "Mevcut lag: $SOURCE_LAG"

Gerçek Dünya Senaryosu: Active-Active Replikasyon

Tek yönlü replikasyon çoğu DR senaryosu için yeterli, ama bazı durumlarda her iki cluster’ın da yazma alabileceği active-active kurulum gerekiyor. Bu senaryoda döngüsel replikasyona (cycle) dikkat etmek şart.

MM2 bu problemi cluster alias’ları ile çözüyor. Bir mesaj source.target.orders formatında bir topic’e düşerse MM2 bunu tekrar replike etmiyor. Ama yine de konfigürasyonun doğru yapılması gerekiyor:

# active-active-mirror.properties

clusters = istanbul, ankara

istanbul.bootstrap.servers = kafka-istanbul-1:9092,kafka-istanbul-2:9092
ankara.bootstrap.servers = kafka-ankara-1:9092,kafka-ankara-2:9092

# Her iki yönde de replikasyon
istanbul->ankara.enabled = true
ankara->istanbul.enabled = true

istanbul->ankara.topics = local..*
ankara->istanbul.topics = local..*

# Döngüsel replikasyonu engelle
istanbul->ankara.topics.exclude = ankara..*
ankara->istanbul.topics.exclude = istanbul..*

# Offset senkronizasyonu her iki yönde
istanbul->ankara.sync.group.offsets.enabled = true
ankara->istanbul.sync.group.offsets.enabled = true

replication.factor = 3

Bu konfigürasyonda her cluster sadece kendi local.* topic’lerini karşı tarafa replike eder. İstanbul’daki local.payments topic’i Ankara’da istanbul.local.payments olarak görünür. Ankara kendi local.payments topic’ini de İstanbul’a ankara.local.payments olarak gönderir.

Yaygın Sorunlar ve Çözümleri

Sorun 1: Topic’ler hedef cluster’da oluşmuyor

MM2’nin topic oluşturma yetkisi olmalı. Hedef cluster’da ACL kontrol edin:

# Hedef cluster'da MirrorMaker kullanıcısına yetki ver
/opt/kafka/bin/kafka-acls.sh 
  --bootstrap-server kafka-target-1:9092 
  --add 
  --allow-principal User:mirrormaker-user 
  --operation Create 
  --topic '*'

Sorun 2: Replikasyon durdu, lag artıyor

Önce log’lara bakın, ardından consumer group durumunu kontrol edin:

# Consumer group durumunu kontrol et
/opt/kafka/bin/kafka-consumer-groups.sh 
  --bootstrap-server kafka-source-1:9092 
  --describe 
  --group mirrormaker-consumer-group

# Connector durumunu REST API ile kontrol et
curl -s http://mirrormaker-worker-1:8083/connectors/source-mirror-source-connector/status 
  | python3 -m json.tool

Sorun 3: Mesaj sırası bozuluyor

Partition sayıları kaynak ve hedef arasında uyuşmuyor olabilir. MM2 varsayılan olarak kaynak ile aynı partition sayısını kullanır ama manuel müdahale sonrası değişmiş olabilir. Kontrol edin:

# Her iki cluster'da topic partition sayısını karşılaştır
/opt/kafka/bin/kafka-topics.sh --describe 
  --bootstrap-server kafka-source-1:9092 
  --topic orders

/opt/kafka/bin/kafka-topics.sh --describe 
  --bootstrap-server kafka-target-1:9092 
  --topic source.orders

Systemd ile Servis Yönetimi

Production’da MirrorMaker’ı systemd servisi olarak yönetmek en doğrusu. Hem otomatik başlatma hem de monitoring entegrasyonu için kolaylık sağlıyor:

# /etc/systemd/system/kafka-mirrormaker.service

[Unit]
Description=Apache Kafka MirrorMaker 2
After=network.target
Requires=network.target

[Service]
Type=simple
User=kafka
Group=kafka
Environment="KAFKA_HEAP_OPTS=-Xmx4G -Xms4G"
Environment="KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999"
ExecStart=/opt/kafka/bin/connect-mirror-maker.sh /opt/kafka/config/mirror-maker.properties
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=10
StandardOutput=append:/var/log/kafka/mirrormaker.log
StandardError=append:/var/log/kafka/mirrormaker-error.log
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
# Servisi etkinleştir ve başlat
systemctl daemon-reload
systemctl enable kafka-mirrormaker
systemctl start kafka-mirrormaker
systemctl status kafka-mirrormaker

Heap ve JVM Ayarları

MirrorMaker için JVM ayarları da kritik. Varsayılan heap genellikle yetersiz kalıyor:

# Başlatma scripti veya systemd unit dosyasında
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
export KAFKA_JVM_PERFORMANCE_OPTS="-server 
  -XX:+UseG1GC 
  -XX:MaxGCPauseMillis=20 
  -XX:InitiatingHeapOccupancyPercent=35 
  -XX:+ExplicitGCInvokesConcurrent 
  -XX:MaxInlineLevel=15 
  -Djava.awt.headless=true"

G1GC, büyük heap boyutlarında CMS’e kıyasla çok daha tutarlı pause süreleri veriyor. Yüksek throughput ortamlarında bu farkı replikasyon latency’sinde doğrudan görebilirsiniz.

Sonuç

Kafka MirrorMaker, doğru konfigüre edildiğinde son derece güvenilir bir replikasyon aracı. Ama “doğru konfigüre edildiğinde” kısmı burada anahtar kelime. Yıllar içinde gördüğüm en yaygın sorun, MirrorMaker’ın kurulup bırakılması ve monitoring’in ihmal edilmesi oldu. Replikasyon bir gece sessizce durdu, sabah oldu ve DR cluster’ı saatlerce geride kalmış bulundu.

Önem sırasıyla şu noktalara dikkat edin: Önce monitoring kurun, sonra replikasyonu başlatın. Lag alertleriniz olmadan MM2 çalıştırmak güvenlik kemeri takmadan araba sürmek gibi. Distributed modu tercih edin çünkü standalone mod single point of failure. SSL ve SASL’ı atlamayın çünkü inter-cluster trafik genellikle farklı ağ segmentlerinden geçiyor. Offset senkronizasyonunu etkinleştirin çünkü DR senaryosunda failover sonrası consumer’ların doğru noktadan devam etmesi kritik.

MM2’nin MM1’e göre en büyük avantajı operasyonel olgunluğu. Offset senkronizasyonu, ACL replikasyonu, ve dinamik konfigürasyon değişikliği gibi özellikler legacy MM1 kurulumlarını MM2’ye taşımayı kesinlikle değer kılıyor. Eğer hala MM1 üzerindeyseniz, en yakın bakım penceresinde migrasyon planlamanızı öneririm.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir