Azure Event Hub ile Veri Akışı Yönetimi

Büyük ölçekli sistemlerde veri akışını yönetmek her zaman sysadmin’lerin kâbusu olmuştur. Logları toplamak, telemetri verilerini işlemek, farklı sistemler arasında mesaj iletmek… Tüm bunları güvenilir, ölçeklenebilir ve yönetilebilir bir şekilde yapmak ciddi bir iş. Azure Event Hub tam da bu noktada devreye giriyor. Saniyede milyonlarca olayı işleyebilen, tamamen yönetilen bu hizmet, özellikle büyük veri senaryolarında ve IoT projelerinde hayat kurtarıcı oluyor. Bu yazıda Azure Event Hub’ı sıfırdan kuracağız, gerçek dünya senaryolarında nasıl kullanacağımızı inceleyeceğiz ve üretime hazır bir yapı oluşturacağız.

Azure Event Hub Nedir ve Ne Zaman Kullanılır

Event Hub, Apache Kafka ile benzer bir mantıkta çalışan, Microsoft’un tam yönetimli mesaj akışı platformudur. Temel fark şu: Kafka’yı kendiniz kurmanız, yönetmeniz, patch’lemeniz gerekir. Event Hub’da bu işlerin hepsini Microsoft yapıyor.

Event Hub’ın parladığı senaryolar şunlar:

  • Telemetri toplama: Binlerce sunucudan, IoT cihazından veya uygulamadan gelen metrik ve log verilerini merkezi olarak toplamak
  • Clickstream analizi: Web sitesi veya uygulama kullanıcı davranışlarını gerçek zamanlı izlemek
  • Fraud detection: Finansal işlemleri anlık olarak analiz edip şüpheli aktiviteleri tespit etmek
  • Log aggregation: Farklı sistemlerden gelen logları tek noktada birleştirmek
  • Event sourcing: Mikroservis mimarisinde olayları kayıt altına almak

Kafka uyumlu bir API sunduğu için mevcut Kafka producer’larınızı minimal değişiklikle Event Hub’a bağlayabilirsiniz. Bu özellik geçiş süreçlerini dramatik şekilde kolaylaştırıyor.

Temel Kavramlar

Sistemi anlamadan kullanmak her zaman sorun çıkarır. Birkaç temel kavramı netleştirelim:

Namespace: Event Hub’larınızı gruplamak için üst düzey kapsayıcı. Bir namespace içinde birden fazla Event Hub oluşturabilirsiniz. Faturalama ve kapasite yönetimi namespace seviyesinde yapılır.

Event Hub (Topic): Namespace içindeki mantıksal veri akış kanalı. Kafka’daki topic’e karşılık gelir.

Partition: Event Hub içindeki paralel veri akış kanalları. Partition sayısı ne kadar yüksekse paralel okuma kapasitesi o kadar artar. Önemli not: Partition sayısını oluşturduktan sonra değiştiremezsiniz (Premium ve Dedicated tier hariç).

Consumer Group: Aynı Event Hub’ı farklı amaçlarla okuyan uygulama grupları. Örneğin aynı telemetri verisini hem Elasticsearch’e hem de Azure Stream Analytics’e gönderebilirsiniz.

Throughput Unit (TU): Standard tier’da kapasite birimi. 1 TU = 1 MB/sn giriş, 2 MB/sn çıkış kapasitesi sağlar.

Event Hub Namespace Oluşturma

Azure CLI ile her şeyi yapabiliriz. Portal tıklamak yerine scripting yaklaşımını tercih edeceğim çünkü tekrarlanabilir ve versiyon kontrol edilebilir altyapı kurmak şart.

# Resource group oluştur
az group create 
  --name rg-eventhub-prod 
  --location westeurope

# Event Hub Namespace oluştur
az eventhubs namespace create 
  --resource-group rg-eventhub-prod 
  --name ehns-mycompany-prod 
  --location westeurope 
  --sku Standard 
  --capacity 2 
  --enable-auto-inflate true 
  --maximum-throughput-units 10 
  --tags Environment=Production Team=Platform

# Event Hub oluştur
az eventhubs eventhub create 
  --resource-group rg-eventhub-prod 
  --namespace-name ehns-mycompany-prod 
  --name eh-telemetry 
  --partition-count 8 
  --message-retention 3 
  --enable-capture false

# Consumer group oluştur
az eventhubs eventhub consumer-group create 
  --resource-group rg-eventhub-prod 
  --namespace-name ehns-mycompany-prod 
  --eventhub-name eh-telemetry 
  --name cg-elasticsearch

az eventhubs eventhub consumer-group create 
  --resource-group rg-eventhub-prod 
  --namespace-name ehns-mycompany-prod 
  --eventhub-name eh-telemetry 
  --name cg-stream-analytics

--enable-auto-inflate true ayarı önemli. Bu sayede trafik artışlarında throughput unit’ler otomatik olarak artıyor ve elle müdahale etmek zorunda kalmıyorsunuz. --maximum-throughput-units ile de bütçe patlamasının önüne geçiyorsunuz.

Erişim Yönetimi ve Güvenlik

Bağlantı string’leriyle çalışmak hâlâ yaygın ama üretim ortamlarında Managed Identity kullanmak çok daha güvenli. Her iki yöntemi de gösterelim.

# SAS Policy oluştur (producer için)
az eventhubs eventhub authorization-rule create 
  --resource-group rg-eventhub-prod 
  --namespace-name ehns-mycompany-prod 
  --eventhub-name eh-telemetry 
  --name SendPolicy 
  --rights Send

# SAS Policy oluştur (consumer için)
az eventhubs eventhub authorization-rule create 
  --resource-group rg-eventhub-prod 
  --namespace-name ehns-mycompany-prod 
  --eventhub-name eh-telemetry 
  --name ListenPolicy 
  --rights Listen

# Connection string al
az eventhubs eventhub authorization-rule keys list 
  --resource-group rg-eventhub-prod 
  --namespace-name ehns-mycompany-prod 
  --eventhub-name eh-telemetry 
  --name SendPolicy 
  --query primaryConnectionString 
  --output tsv

# Managed Identity için rol ataması
# Önce uygulamanızın Service Principal ID'sini alın
APP_PRINCIPAL_ID=$(az ad sp show --id "your-app-id" --query id -o tsv)

# Event Hub namespace üzerinde Azure Event Hubs Data Sender rolü ver
az role assignment create 
  --role "Azure Event Hubs Data Sender" 
  --assignee $APP_PRINCIPAL_ID 
  --scope "/subscriptions/$(az account show --query id -o tsv)/resourceGroups/rg-eventhub-prod/providers/Microsoft.EventHub/namespaces/ehns-mycompany-prod"

# Consumer için Data Receiver rolü ver
az role assignment create 
  --role "Azure Event Hubs Data Receiver" 
  --assignee $APP_PRINCIPAL_ID 
  --scope "/subscriptions/$(az account show --query id -o tsv)/resourceGroups/rg-eventhub-prod/providers/Microsoft.EventHub/namespaces/ehns-mycompany-prod"

Üretimde bağlantı string’lerini Key Vault’ta saklayın. Connection string’i doğrudan uygulama konfigürasyonuna gömmek ciddi güvenlik açığı oluşturur.

Python ile Producer ve Consumer Yazmak

Gerçek dünya uygulamalarında Python çok kullanılıyor. Azure SDK ile hem gönderici hem alıcı taraf kodunu yazalım.

# Gerekli kütüphaneleri kur
pip install azure-eventhub azure-identity azure-storage-blob

# Producer scripti
cat > event_producer.py << 'EOF'
import asyncio
import json
import time
from datetime import datetime
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
from azure.identity.aio import DefaultAzureCredential

EVENTHUB_FQDN = "ehns-mycompany-prod.servicebus.windows.net"
EVENTHUB_NAME = "eh-telemetry"

async def send_telemetry_batch():
    credential = DefaultAzureCredential()
    
    async with EventHubProducerClient(
        fully_qualified_namespace=EVENTHUB_FQDN,
        eventhub_name=EVENTHUB_NAME,
        credential=credential
    ) as producer:
        
        # Batch oluştur - tek tek göndermek yerine batch kullan
        async with producer.create_batch() as batch:
            for i in range(100):
                telemetry = {
                    "timestamp": datetime.utcnow().isoformat(),
                    "server_id": f"srv-{i % 10:03d}",
                    "cpu_usage": round(45.5 + (i * 0.3), 2),
                    "memory_mb": 2048 + (i * 10),
                    "disk_io": round(120.5 + i, 1),
                    "region": "westeurope"
                }
                
                event = EventData(json.dumps(telemetry))
                # Partition key ile ilgili server'ın verisini aynı partition'a gönder
                event.properties = {
                    "partition_key": telemetry["server_id"]
                }
                
                try:
                    batch.add(event)
                except ValueError:
                    # Batch doldu, gönder ve yeni batch oluştur
                    await producer.send_batch(batch)
                    async with producer.create_batch() as batch:
                        batch.add(event)
            
            await producer.send_batch(batch)
            print(f"Batch gönderildi: {datetime.utcnow().isoformat()}")

asyncio.run(send_telemetry_batch())
EOF

python event_producer.py
# Consumer scripti - Checkpoint Store ile
cat > event_consumer.py << 'EOF'
import asyncio
import json
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from azure.identity.aio import DefaultAzureCredential

EVENTHUB_FQDN = "ehns-mycompany-prod.servicebus.windows.net"
EVENTHUB_NAME = "eh-telemetry"
CONSUMER_GROUP = "cg-elasticsearch"
STORAGE_ACCOUNT = "https://stcheckpoints.blob.core.windows.net"
CHECKPOINT_CONTAINER = "checkpoints"

async def process_event(partition_context, event):
    try:
        data = json.loads(event.body_as_str())
        print(f"Partition {partition_context.partition_id}: "
              f"Server {data['server_id']} - CPU: {data['cpu_usage']}%")
        
        # Burada Elasticsearch'e veya başka bir hedefe yazma işlemi yapılır
        # await elasticsearch_client.index(data)
        
        # Checkpoint kaydet - crash sonrası kaldığı yerden devam etmek için
        await partition_context.update_checkpoint(event)
        
    except Exception as e:
        print(f"Event işleme hatası: {e}")

async def main():
    credential = DefaultAzureCredential()
    
    # Blob storage checkpoint store - hangi event'e kadar işlendiği burada tutulur
    checkpoint_store = BlobCheckpointStore(
        blob_account_url=STORAGE_ACCOUNT,
        container_name=CHECKPOINT_CONTAINER,
        credential=credential
    )
    
    async with EventHubConsumerClient(
        fully_qualified_namespace=EVENTHUB_FQDN,
        eventhub_name=EVENTHUB_NAME,
        consumer_group=CONSUMER_GROUP,
        checkpoint_store=checkpoint_store,
        credential=credential
    ) as client:
        
        print("Event Hub dinleniyor...")
        await client.receive(
            on_event=process_event,
            starting_position="-1"  # En baştan başla, checkpoint yoksa
        )

asyncio.run(main())
EOF

python event_consumer.py

Checkpoint store kullanımı kritik. Consumer uygulamanız çöktüğünde veya yeniden başladığında kaldığı yerden devam eder. Blob storage en yaygın checkpoint backend’idir.

Event Capture ile Azure Data Lake’e Arşivleme

Gerçek zamanlı işlemenin yanı sıra, verileri uzun vadeli saklama ve batch analiz için Data Lake’e atmak isteyebilirsiniz. Capture özelliği bunu otomatik yapar.

# Önce storage hesabı ve container oluştur
az storage account create 
  --name sttelemetryarchive 
  --resource-group rg-eventhub-prod 
  --location westeurope 
  --sku Standard_LRS 
  --kind StorageV2 
  --hierarchical-namespace true

az storage container create 
  --name telemetry-raw 
  --account-name sttelemetryarchive

# Event Hub'da Capture'ı etkinleştir
az eventhubs eventhub update 
  --resource-group rg-eventhub-prod 
  --namespace-name ehns-mycompany-prod 
  --name eh-telemetry 
  --enable-capture true 
  --capture-interval 300 
  --capture-size-limit 314572800 
  --destination-name EventHubArchive.AzureBlockBlob 
  --storage-account "/subscriptions/$(az account show --query id -o tsv)/resourceGroups/rg-eventhub-prod/providers/Microsoft.Storage/storageAccounts/sttelemetryarchive" 
  --blob-container telemetry-raw 
  --archive-name-format "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"

--capture-interval 300 saniye cinsinden 5 dakikayı ifade eder. --capture-size-limit ise byte cinsinden yaklaşık 300 MB anlamına gelir. Hangisi önce doluyorsa dosya o anda kapatılır ve yeni dosya açılır. Veriler Avro formatında kaydedilir, Databricks veya Spark ile doğrudan okuyabilirsiniz.

Monitoring ve Alerting Kurulumu

Event Hub’ı kurdunuz, çalışıyor, güzel. Ama ne zaman sorun var? Nasıl anlıyorsunuz? İzleme olmadan yönetim olmaz.

# Diagnostic settings - Log Analytics'e gönder
LOG_ANALYTICS_ID=$(az monitor log-analytics workspace show 
  --resource-group rg-monitoring 
  --workspace-name law-platform 
  --query id -o tsv)

EVENTHUB_NS_ID=$(az eventhubs namespace show 
  --resource-group rg-eventhub-prod 
  --name ehns-mycompany-prod 
  --query id -o tsv)

az monitor diagnostic-settings create 
  --name "eventhub-diagnostics" 
  --resource $EVENTHUB_NS_ID 
  --workspace $LOG_ANALYTICS_ID 
  --logs '[
    {"category": "OperationalLogs", "enabled": true},
    {"category": "AutoScaleLogs", "enabled": true},
    {"category": "KafkaCoordinatorLogs", "enabled": true}
  ]' 
  --metrics '[
    {"category": "AllMetrics", "enabled": true, "retentionPolicy": {"enabled": true, "days": 30}}
  ]'

# Throttling alert - TU limiti dolmaya başladığında uyar
az monitor metrics alert create 
  --name "eventhub-throttling-alert" 
  --resource-group rg-eventhub-prod 
  --scopes $EVENTHUB_NS_ID 
  --condition "avg ThrottledRequests > 10" 
  --window-size 5m 
  --evaluation-frequency 1m 
  --severity 2 
  --description "Event Hub throttling başladı, TU artışı gerekebilir" 
  --action-group "/subscriptions/$(az account show --query id -o tsv)/resourceGroups/rg-monitoring/providers/Microsoft.Insights/actionGroups/ag-platform-team"

# Incoming messages sayısı sıfıra düştüğünde uyar (producer'da sorun var demektir)
az monitor metrics alert create 
  --name "eventhub-no-incoming-alert" 
  --resource-group rg-eventhub-prod 
  --scopes $EVENTHUB_NS_ID 
  --condition "avg IncomingMessages < 1" 
  --window-size 10m 
  --evaluation-frequency 5m 
  --severity 1 
  --description "10 dakikadır hiç mesaj gelmiyor!"

Log Analytics’te şu KQL sorgusunu kullanarak throttling durumunu izleyebilirsiniz:

# Bu sorguyu Azure Monitor'da çalıştırın
cat << 'EOF'
AzureMetrics
| where ResourceProvider == "MICROSOFT.EVENTHUB"
| where MetricName in ("ThrottledRequests", "IncomingMessages", "OutgoingMessages", "ActiveConnections")
| summarize avg(Average) by bin(TimeGenerated, 5m), MetricName
| render timechart
EOF

Gerçek Dünya Senaryosu: Çok Bölgeli Log Toplama

Düşünelim: 5 farklı Azure bölgesinde çalışan mikroservisleriniz var. Tüm logları merkezi olarak Event Hub’a gönderip oradan Elasticsearch’e yüklemek istiyorsunuz.

# Her bölgede ayrı Event Hub oluştur (latency azaltmak için)
for REGION in westeurope northeurope eastus westus2 southeastasia; do
  az eventhubs namespace create 
    --resource-group rg-eventhub-prod 
    --name "ehns-mycompany-${REGION}" 
    --location $REGION 
    --sku Standard 
    --capacity 1 
    --enable-auto-inflate true 
    --maximum-throughput-units 5

  az eventhubs eventhub create 
    --resource-group rg-eventhub-prod 
    --namespace-name "ehns-mycompany-${REGION}" 
    --name eh-applogs 
    --partition-count 4 
    --message-retention 1

  echo "Namespace oluşturuldu: ehns-mycompany-${REGION}"
done

# Her namespace için merkezi consumer'ın kullanacağı policy oluştur
for REGION in westeurope northeurope eastus westus2 southeastasia; do
  az eventhubs eventhub authorization-rule create 
    --resource-group rg-eventhub-prod 
    --namespace-name "ehns-mycompany-${REGION}" 
    --eventhub-name eh-applogs 
    --name CentralConsumerPolicy 
    --rights Listen

  echo "Policy bağlantı string'i (${REGION}):"
  az eventhubs eventhub authorization-rule keys list 
    --resource-group rg-eventhub-prod 
    --namespace-name "ehns-mycompany-${REGION}" 
    --eventhub-name eh-applogs 
    --name CentralConsumerPolicy 
    --query primaryConnectionString 
    --output tsv
done

Bu mimaride her bölgedeki servisler en yakın Event Hub namespace’ine yazıyor. Merkezdeki bir consumer uygulaması tüm namespace’leri dinleyip verileri birleştiriyor. Network latency minimumda kalıyor, veri kaybı riski azalıyor.

Event Hub ile Kafka Uyumluluğu

Mevcut Kafka altyapınız varsa ve Event Hub’a geçmek istiyorsanız, Kafka protokol desteği sayesinde producer ve consumer kodunuzu değiştirmenize gerek yok. Sadece connection bilgilerini güncelleyeceksiniz.

# Kafka uyumlu bağlantı için SASL/SSL konfigürasyonu
cat > kafka-producer.properties << 'EOF'
bootstrap.servers=ehns-mycompany-prod.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required 
  username="$ConnectionString" 
  password="Endpoint=sb://ehns-mycompany-prod.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=YOUR_KEY_HERE";

# Performance ayarları
batch.size=65536
linger.ms=5
compression.type=gzip
acks=1
EOF

# Kafka console producer ile test et
kafka-console-producer.sh 
  --producer.config kafka-producer.properties 
  --topic eh-telemetry 
  --bootstrap-server ehns-mycompany-prod.servicebus.windows.net:9093

Kafka topic isimleri Event Hub isimleriyle birebir eşleşiyor. --topic eh-telemetry yazınca doğrudan o Event Hub’a bağlanıyorsunuz.

Performans Optimizasyonu ve En İyi Pratikler

Birkaç yıllık Event Hub deneyiminden derlediğim kritik noktalar:

Partition sayısı seçimi: Producer sayısı ve consumer parallelism göz önüne alınarak belirleyin. 8-32 arası genellikle iyi bir başlangıç. Çok az partition paralel okumayı kısıtlar, çok fazla partition yönetim overhead’i artırır.

Batch gönderim: Tek tek event göndermek yerine her zaman batch kullanın. Batch gönderimlerde throughput 10-15x artabilir. create_batch() metodunu tercih edin ve batch’in dolması durumunu handle edin.

Retention süresi: Varsayılan 1 gündür. Veri kaybı riskini azaltmak için consumer’larınızın işlem kapasitesine göre artırın. 7 güne kadar uzatabilirsiniz Standard tier’da.

Consumer Group yönetimi: Her uygulama için ayrı consumer group kullanın. Aynı consumer group’u birden fazla uygulama paylaşırsa partition başına tek reader kısıtından dolayı sorun yaşarsınız.

Dead letter handling: İşlenemeyen event’ler için ayrı bir Event Hub veya Storage Queue kullanın. Hatalı event’ler kaybolmasın.

Connection pooling: Her event için yeni connection açmayın. Producer ve consumer client’larını uygulama yaşam döngüsü boyunca singleton olarak kullanın.

Auto-inflate dikkatli kullan: Auto-inflate TU’ları artırır ama azaltmaz. Gece yarısı gelen bir trafik spike’ı sabaha kadar yüksek TU’da çalışmanıza neden olabilir. Maliyeti takip edin.

Maliyet Yönetimi

Event Hub maliyetleri namespace tier’ına ve tüketilen TU miktarına göre şekillenir.

Standard tier’da dikkat edilmesi gereken maliyet kalemleri:

  • Throughput Unit: Saatlik ücretlendirilir. Kullanmasanız bile namespace ayakta olduğu sürece ödersiniz.
  • Ingress events: İlk 1 milyon event’e kadar ücretsiz, sonrası GB başına ücret.
  • Capture: Ek ücrete tabidir, ihtiyaç duymuyorsanız kapatın.
  • Extended retention: 1 günden fazla retention için ek maliyet.
# Kullanılmayan namespace'leri bul ve temizle
az eventhubs namespace list 
  --resource-group rg-eventhub-prod 
  --query "[?sku.name=='Standard'].{Name:name, Location:location, TU:sku.capacity}" 
  --output table

# Namespace metriklerini kontrol et - gerçekten kullanılıyor mu?
az monitor metrics list 
  --resource "/subscriptions/$(az account show --query id -o tsv)/resourceGroups/rg-eventhub-prod/providers/Microsoft.EventHub/namespaces/ehns-mycompany-prod" 
  --metric "IncomingMessages" 
  --start-time "2024-01-01T00:00:00Z" 
  --end-time "2024-01-08T00:00:00Z" 
  --interval PT1H 
  --query "value[0].timeseries[0].data[*].{Time:timeStamp, Messages:total}" 
  --output table

Dev/test namespace’lerini mesai saatleri dışında silip yeniden oluşturabilirsiniz. Terraform veya Bicep ile bunu otomatikleştirmek mümkün.

Sonuç

Azure Event Hub, doğru konfigüre edildiğinde gerçekten güçlü bir veri akışı platformu. Kafka’yı kendiniz yönetmek istemiyorsanız veya Azure ekosistemiyle derin entegrasyon lazımsa Event Hub mantıklı bir seçim. Özellikle auto-inflate, managed identity desteği ve Capture özelliği üretim ortamlarında ciddi operasyonel yük alıyor üstünüzden.

Dikkat edilmesi gereken noktalar var tabii. Partition sayısını baştan doğru belirleyin çünkü sonradan değiştiremezsiniz. Consumer group’ları iyi planlayın. Checkpoint store’u mutlaka kullanın, aksi halde her yeniden başlamada baştan okuma yaparsınız. Monitoring ve alerting’i ihmal etmeyin, throttling başladığında haber almak istersiniz.

Kafka’dan geçiş yapıyorsanız Kafka uyumlu API büyük kolaylık sağlıyor. Minimal kod değişikliği ile test edip geçişi değerlendirebilirsiniz. Maliyet tarafını da gözden kaçırmayın, TU bazlı faturalama düşük trafikli ortamlarda pahalıya gelebilir. Bu durumda Serverless tier veya daha küçük kapasitede başlamak mantıklı.

Sonuçta Event Hub bir araç, onu doğru kullanmak sizin elinizde. İyi tasarlanmış bir Event Hub altyapısı hem geliştirici ekiplerin hem de ops ekiplerinin hayatını kolaylaştırıyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir