Kafka Schema Registry ile Mesaj Şeması Yönetimi

Prodüksiyonda Kafka kullanıyorsanız ve şema yönetimini hâlâ elle yapıyorsanız, bu yazı tam size göre. Ekipler büyüdükçe, servisler çoğaldıkça ve mesaj formatları değiştikçe “hangi topic’te hangi formatta veri var?” sorusu kabusu olmaya başlıyor. Schema Registry bu kabusu ortadan kaldırmak için var.

Schema Registry Nedir ve Neden Lazım?

Kafka kendi başına şema konusunda oldukça laissez-faire bir yaklaşım benimsiyor. Topic’e ne atarsan at, bayt olarak saklıyor. Bu esneklik başlangıçta güzel görünüyor ama servis sayısı arttıkça producer ve consumer’lar arasındaki koordinasyon giderek zorlaşıyor.

Diyelim ki ödeme servisiniz bir payment.completed event’i yayınlıyor. Bu event’i dinleyen beş farklı servis var: bildirim servisi, muhasebe servisi, fraud detection, raporlama ve loyalty programı. Ödeme servisinin geliştirme ekibi amount alanını decimal yerine string olarak değiştirdi. Kim ne zaman haberdar olacak? Hangi servis patladığında fark edilecek? Prodüksiyona çıktıktan sonra mı?

İşte Schema Registry bu tür “sessiz veri bozulmalarını” engellemeye yarıyor. Confluent Platform ile birlikte gelen Confluent Schema Registry en yaygın kullanılan implementasyon. Aynı zamanda Apicurio Registry de popüler bir açık kaynak alternatif. Bu yazıda Confluent Schema Registry üzerinden gideceğiz.

Schema Registry temelde şunları yapıyor:

  • Avro, Protobuf veya JSON Schema formatlarında şema depolama
  • Her şemaya benzersiz bir ID atama
  • Şemalar arası uyumluluk (compatibility) kontrolü
  • Producer ve consumer’ların runtime’da şema doğrulaması yapmasına olanak tanıma

Kurulum ve Başlangıç Konfigürasyonu

Basit bir Docker Compose ortamıyla başlayalım. Gerçek hayatta Kubernetes’e deploy ediyorsunuz büyük ihtimalle ama kavramları anlamak için bu yeterli.

# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 1
docker-compose up -d
# Schema Registry ayağa kalktı mı kontrol edelim
curl -s http://localhost:8081/subjects | jq .
# [] döndürmesi bekleniyor, henüz şema yok

Schema Registry, şemaları Kafka’nın kendi _schemas topic’inde saklıyor. Bu önemli bir detay: ayrı bir veritabanı bağımlılığı yok, state’i Kafka’da tutuyor.

İlk Şemayı Kaydetmek

Avro ile başlayalım. Avro, JSON tabanlı bir şema tanımlama dili ve Kafka ekosisteminde en yaygın kullanılan format. Binary encoding kullandığı için JSON’a kıyasla çok daha verimli.

# payment_completed şemasını kaydedelim
curl -X POST http://localhost:8081/subjects/payment.completed-value/versions 
  -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  -d '{
    "schema": "{"type":"record","name":"PaymentCompleted","namespace":"com.sirket.odeme","fields":[{"name":"payment_id","type":"string"},{"name":"user_id","type":"string"},{"name":"amount","type":"double"},{"name":"currency","type":"string"},{"name":"timestamp","type":"long","logicalType":"timestamp-millis"}]}"
  }'
# {"id":1} döndürmeli

Subject isimlendirmesine dikkat edin. Varsayılan strateji TopicNameStrategy olup {topic-adı}-value veya {topic-adı}-key formatını kullanıyor. Bu strateji topic başına tek bir şema demek. RecordNameStrategy ile kayıt adına göre de yönetebilirsiniz, bu daha esnek ama daha dikkatli yönetim gerektiriyor.

Kayıtlı şemayı görmek için:

# Tüm subject'leri listele
curl -s http://localhost:8081/subjects

# Belirli bir subject'in versiyonlarını göster
curl -s http://localhost:8081/subjects/payment.completed-value/versions

# Belirli bir versiyonu getir
curl -s http://localhost:8081/subjects/payment.completed-value/versions/1 | jq .

# Şema ID'siyle getir
curl -s http://localhost:8081/schemas/ids/1 | jq .

Uyumluluk Modları: Asıl Kritik Kısım

Schema Registry’nin en değerli özelliği uyumluluk kontrolü. Yanlış konfigüre edilmiş bir uyumluluk modu, ya gereksiz kısıtlamalara ya da veri kaybına yol açar.

Uyumluluk modları şunlar:

  • BACKWARD: Yeni şemayla eski mesajlar okunabilir. Consumer’ı önce güncelle, sonra producer’ı. En sık kullanılan mod.
  • FORWARD: Eski şemayla yeni mesajlar okunabilir. Producer’ı önce güncelle, sonra consumer’ı.
  • FULL: Hem backward hem forward uyumlu. En kısıtlayıcı mod.
  • BACKWARD_TRANSITIVE: Tüm önceki versiyonlarla backward uyumlu.
  • FORWARD_TRANSITIVE: Tüm önceki versiyonlarla forward uyumlu.
  • FULL_TRANSITIVE: Tüm önceki versiyonlarla tam uyumlu.
  • NONE: Hiçbir uyumluluk kontrolü yok. Dikkatli kullanın.
# Global uyumluluk modunu görüntüle
curl -s http://localhost:8081/config | jq .

# Global modu BACKWARD olarak ayarla
curl -X PUT http://localhost:8081/config 
  -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  -d '{"compatibility": "BACKWARD"}'

# Belirli bir subject için farklı mod ayarla
curl -X PUT http://localhost:8081/config/payment.completed-value 
  -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  -d '{"compatibility": "FULL_TRANSITIVE"}'

Prodüksiyonda genellikle kritik topic’ler için FULL_TRANSITIVE, daha az kritik olanlar için BACKWARD kullanıyoruz.

Şema Evrimi: Gerçek Hayat Senaryosu

Diyelim ki payment.completed event’ine merchant_id alanı eklemeniz gerekiyor. BACKWARD uyumlulukta bu mümkün, ama dikkat edilmesi gereken bir nokta var: yeni alan ya opsiyonel olmalı ya da varsayılan değere sahip olmalı.

# Yeni versiyonu kaydetmeden önce uyumluluk kontrolü yap
curl -X POST http://localhost:8081/compatibility/subjects/payment.completed-value/versions/latest 
  -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  -d '{
    "schema": "{"type":"record","name":"PaymentCompleted","namespace":"com.sirket.odeme","fields":[{"name":"payment_id","type":"string"},{"name":"user_id","type":"string"},{"name":"amount","type":"double"},{"name":"currency","type":"string"},{"name":"timestamp","type":"long","logicalType":"timestamp-millis"},{"name":"merchant_id","type":["null","string"],"default":null}]}"
  }'
# {"is_compatible":true} döndürmeli

Uyumluluk onaylandıysa yeni versiyonu kayıt edebiliriz:

curl -X POST http://localhost:8081/subjects/payment.completed-value/versions 
  -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  -d '{
    "schema": "{"type":"record","name":"PaymentCompleted","namespace":"com.sirket.odeme","fields":[{"name":"payment_id","type":"string"},{"name":"user_id","type":"string"},{"name":"amount","type":"double"},{"name":"currency","type":"string"},{"name":"timestamp","type":"long","logicalType":"timestamp-millis"},{"name":"merchant_id","type":["null","string"],"default":null}]}"
  }'
# {"id":2} döndürmeli

Şimdi ne oldu? Eski consumer’lar merchant_id alanını tanımıyor ama null default sayesinde okumaya devam edebiliyor. Yeni consumer’lar ise her iki versiyonla da çalışabiliyor. Bu BACKWARD uyumluluğun özü.

Python ile Pratik Kullanım

Teorik kısmı geçtik, şimdi gerçek kod yazalım. confluent-kafka ve fastavro kütüphanelerini kullanacağız.

# requirements: confluent-kafka[avro], requests

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import MessageSerializer
import time

SCHEMA_REGISTRY_URL = "http://localhost:8081"
BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC = "payment.completed"

# Şemayı string olarak tanımla
value_schema_str = """
{
  "type": "record",
  "name": "PaymentCompleted",
  "namespace": "com.sirket.odeme",
  "fields": [
    {"name": "payment_id", "type": "string"},
    {"name": "user_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "merchant_id", "type": ["null", "string"], "default": null}
  ]
}
"""

value_schema = avro.loads(value_schema_str)

# Producer konfigürasyonu
producer_config = {
    'bootstrap.servers': BOOTSTRAP_SERVERS,
    'schema.registry.url': SCHEMA_REGISTRY_URL,
    'auto.register.schemas': False  # Prodüksiyonda şemaları otomatik kaydetme!
}

producer = AvroProducer(producer_config, default_value_schema=value_schema)

# Mesaj gönder
payment_event = {
    "payment_id": "pay_123abc",
    "user_id": "usr_456def",
    "amount": 299.99,
    "currency": "TRY",
    "timestamp": int(time.time() * 1000),
    "merchant_id": {"string": "mrc_789ghi"}
}

producer.produce(topic=TOPIC, value=payment_event)
producer.flush()
print("Mesaj gönderildi")

auto.register.schemas: False ayarını prodüksiyonda mutlaka kullanın. Aksi takdirde bir geliştirici yanlış şemayla mesaj göndermeye kalktığında Schema Registry’ye otomatik kaydedip uyumluluk kontrollerini by-pass edebilir. Şema kayıt işlemi CI/CD pipeline’ınızda kontrollü olmalı.

# Consumer konfigürasyonu
consumer_config = {
    'bootstrap.servers': BOOTSTRAP_SERVERS,
    'schema.registry.url': SCHEMA_REGISTRY_URL,
    'group.id': 'odeme-isleme-grubu',
    'auto.offset.reset': 'earliest'
}

consumer = AvroConsumer(consumer_config)
consumer.subscribe([TOPIC])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Consumer hatasi: {msg.error()}")
        continue

    payment = msg.value()
    print(f"Odeme alindi: {payment['payment_id']}, Tutar: {payment['amount']} {payment['currency']}")

consumer.close()

CI/CD Pipeline’ına Entegrasyon

Şema yönetimini manuel yapmak uzun vadede sürdürülemez. Şemaları kaynak kodunuzla birlikte versiyon kontrolüne alın ve deployment sürecine entegre edin.

Tipik bir GitOps yaklaşımı şöyle görünebilir: Şemalar schemas/ dizininde .avsc dosyaları olarak tutuluyor. Her PR’da uyumluluk kontrolü yapılıyor, main’e merge olduğunda Schema Registry’ye kayıt ediliyor.

#!/bin/bash
# schema-check.sh - CI pipeline'ında çalışacak script

SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL:-"http://localhost:8081"}
SCHEMAS_DIR="./schemas"
EXIT_CODE=0

for schema_file in "$SCHEMAS_DIR"/*.avsc; do
    subject_name=$(basename "$schema_file" .avsc)
    echo "Kontrol ediliyor: $subject_name"

    # Subject var mi diye kontrol et
    http_status=$(curl -s -o /dev/null -w "%{http_code}" 
        "$SCHEMA_REGISTRY_URL/subjects/$subject_name/versions/latest")

    if [ "$http_status" == "404" ]; then
        echo "  Yeni subject, uyumluluk kontrolu atlanıyor: $subject_name"
        continue
    fi

    # Uyumluluk kontrolu
    schema_content=$(cat "$schema_file" | tr -d 'n' | sed 's/"/\"/g')
    result=$(curl -s -X POST 
        "$SCHEMA_REGISTRY_URL/compatibility/subjects/$subject_name/versions/latest" 
        -H "Content-Type: application/vnd.schemaregistry.v1+json" 
        -d "{"schema": "$schema_content"}")

    is_compatible=$(echo $result | jq -r '.is_compatible')

    if [ "$is_compatible" != "true" ]; then
        echo "  HATA: $subject_name uyumsuz degisiklik iceriyor!"
        echo "  Detay: $result"
        EXIT_CODE=1
    else
        echo "  OK: $subject_name uyumlu"
    fi
done

exit $EXIT_CODE

Bu script’i GitHub Actions veya GitLab CI’a kolayca entegre edebilirsiniz. Pull request aşamasında çalıştırırsanız, uyumsuz şema değişikliklerinin prodüksiyona ulaşmasını engellemiş olursunuz.

Schema Registry’yi İzlemek

Prodüksiyonda Schema Registry’nin sağlığını takip etmek önemli. Birkaç kritik endpoint ve metrik:

# Sağlık durumu
curl -s http://localhost:8081/ | jq .

# JMX metrikleri aktifse (SCHEMA_REGISTRY_JMX_PORT env var)
# Prometheus ile scrape için özel exporter gerekiyor

# Subject ve versiyon sayısını öğren
subject_count=$(curl -s http://localhost:8081/subjects | jq 'length')
echo "Toplam subject sayisi: $subject_count"

# Her subject icin versiyon sayisini göster
for subject in $(curl -s http://localhost:8081/subjects | jq -r '.[]'); do
    version_count=$(curl -s "http://localhost:8081/subjects/$subject/versions" | jq 'length')
    echo "$subject: $version_count versiyon"
done

Schema Registry için izlenmesi gereken başlıca şeyler:

  • Yanıt süresi: /subjects endpoint’inin yanıt süresi genellikle iyi bir genel sağlık göstergesi
  • _schemas topic’i: Bu topic’in lag’i artıyorsa Schema Registry’nin Kafka’yı takip etmekte zorlandığını gösterir
  • Bellek kullanımı: Şema cache’i JVM heap’inde tutuluyor, binlerce şema varsa dikkat edin

Yaygın Sorunlar ve Çözümleri

Prodüksiyonda karşılaştığımız birkaç tipik sorun:

Şema kaydında 409 Conflict hatası: Aynı içerikle farklı bir kayıt yapmaya çalışıyorsunuz muhtemelen. Schema Registry aynı içerikli şemayı yeniden kaydetmez, mevcut ID’yi döner. Sorun değil ama CI’da bazen kafa karıştırıcı olabiliyor.

SerializationException: Error retrieving Avro schema: Consumer şema ID’sini bulamıyor. Schema Registry erişilebilir mi? Network politikalarını kontrol edin. Kubernetes’te servis DNS’ini doğru yazdığınızdan emin olun.

Uyumluluk kontrolünü geçici olarak devre dışı bırakmak: Bazen zorunlu bir breaking change yapmanız gerekiyor. Bu durumu dikkatli yönetmek lazım:

# Son derece dikkatli kullanin, geri almayı unutmayin
curl -X PUT http://localhost:8081/config/payment.completed-value 
  -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  -d '{"compatibility": "NONE"}'

# Breaking change'i kaydet
# ... şema kayıt işlemi ...

# Hemen geri al
curl -X PUT http://localhost:8081/config/payment.completed-value 
  -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  -d '{"compatibility": "BACKWARD"}'

Bu operasyonu yapıyorsanız tüm consumer’ların yeni şemayı desteklediğinden emin olun ve koordineli bir deployment yapın.

Soft delete vs Hard delete: Schema Registry varsayılan olarak soft delete yapıyor. Şema ID’leri kaybolmuyor, sadece “deleted” olarak işaretleniyor. Hard delete için:

# Önce soft delete
curl -X DELETE "http://localhost:8081/subjects/test-subject/versions/1"

# Sonra hard delete (kalici)
curl -X DELETE "http://localhost:8081/subjects/test-subject/versions/1?permanent=true"

Prodüksiyonda hard delete’i neredeyse hiç kullanmak istemezsiniz. Herhangi bir consumer hâlâ o şema ID’sini referans alan bir mesaj okumaya çalışıyorsa patlar.

Protobuf ve JSON Schema Alternatifleri

Avro her zaman doğru seçim değil. Bazı ekipler gRPC altyapısıyla tutarlılık için Protobuf tercih ediyor.

# Protobuf şeması kaydetmek
curl -X POST http://localhost:8081/subjects/order.created-value/versions 
  -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  -d '{
    "schemaType": "PROTOBUF",
    "schema": "syntax = "proto3";npackage com.sirket.siparis;nnmessage OrderCreated {n  string order_id = 1;n  string customer_id = 2;n  double total_amount = 3;n  repeated string product_ids = 4;n}n"
  }'

JSON Schema ise şema doğrulamasına başlamak için en düşük geçiş eşiği. Mevcut REST API’larınızda JSON kullanıyorsanız ve Avro’ya geçmek büyük bir değişiklikse, JSON Schema ile başlamak mantıklı olabilir. Ama binary encoding’in getirdiği performans avantajından yararlanamıyorsunuz.

Genel kural olarak: yüksek throughput, düşük latency gerektiren durumlar için Avro veya Protobuf; hızlı başlangıç, daha az strict ekosistemler için JSON Schema.

Sonuç

Schema Registry, Kafka kullanan her ciddi prodüksiyon ortamının ihtiyaç duyduğu bir yapı taşı. Kurulumu nispeten basit ama konfigürasyon detayları, özellikle uyumluluk modları ve CI/CD entegrasyonu, düzgün planlamayı gerektiriyor.

En kritik noktaları özetleyecek olursam: prodüksiyonda auto.register.schemas‘ı mutlaka kapatın, şemalarınızı kaynak kodunuzla birlikte versiyon kontrolüne alın, uyumluluk modunu iş gereksinimlerinize göre seçin ve şema değişikliklerini deployment sürecinizin bir parçası haline getirin.

Schema Registry’yi doğru kurduğunuzda, producer ve consumer ekipleri birbirinden bağımsız çalışabilir hale geliyor. “Bu mesajda hangi alanlar vardı?” diye Slack’te birbirini bulmak yerine herkes Registry’ye bakıyor. Bu küçük bir şey gibi görünse de büyük ekiplerde kurtarılan zaman ve önlenen prodüksiyon olayları açısından değeri tartışılmaz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir