GCP Pub/Sub Mesajlaşma Sistemi Kurulumu ve Yönetimi
Dağıtık sistemler kurduğunuzda en sık karşılaşılan sorunlardan biri servisler arası iletişimi güvenilir ve ölçeklenebilir şekilde yönetmektir. Bir servisten diğerine direkt HTTP çağrısı yapmak başlangıçta basit görünse de, sistemin büyümesiyle birlikte bu yaklaşım ciddi kırılganlıklara yol açar. GCP Pub/Sub tam da bu noktada devreye girer ve Google’ın yönettiği, tamamen serverless bir mesajlaşma altyapısı sunar.
Pub/Sub Nedir ve Ne Zaman Kullanılır
Pub/Sub, Publisher-Subscriber (Yayıncı-Abone) desenini uygulayan bir mesajlaşma servisidir. Temel fikir şu: mesaj gönderen taraf (publisher) mesajı kime gideceğini bilmez, sadece bir topic‘e yazar. Mesajı alan taraf (subscriber) ise bu topic’e abone olmuş bir subscription üzerinden mesajları çeker.
Bu modelin gerçek hayatta işe yaradığı senaryolar:
- E-ticaret sitesinde sipariş verildiğinde envanter, fatura ve kargo servislerini paralel tetiklemek
- Log verilerini merkezi bir sisteme aktarmadan önce tamponlamak
- Mikroservisler arası asenkron iletişim kurmak
- Büyük veri pipeline’larında stream processing yapmak
- IoT cihazlarından gelen telemetri verilerini işlemek
Bir e-ticaret senaryosunu ele alalım: Kullanıcı sipariş verdiğinde siparişi doğrulayan servis, envanter güncelleyen servis, fatura oluşturan servis ve kargo firmasına bildirim gönderen servis aynı anda tetiklenmesi gerekiyor. Pub/Sub olmadan bu servisleri birbirine bağlamak hem karmaşık hem de kırılgan olur. Bir servis çökerse diğerleri de etkilenir. Pub/Sub ile her servis bağımsız çalışır, mesajlar güvenilir şekilde iletilir.
Gereksinimler ve Ön Hazırlık
Başlamadan önce şunlar hazırda olmalı:
- Aktif bir GCP projesi
- Faturalandırması açık hesap
gcloudCLI kurulu ve yapılandırılmış- Temel Linux komut satırı bilgisi
gcloud kurulu değilse önce onu kuralım:
# Debian/Ubuntu için
curl https://sdk.cloud.google.com | bash
exec -l $SHELL
# gcloud init ile yapılandır
gcloud init
# Aktif hesabı kontrol et
gcloud auth list
# Aktif projeyi kontrol et
gcloud config get-value project
Pub/Sub API’sini projenizde etkinleştirmeniz gerekiyor:
# Pub/Sub API'yi etkinleştir
gcloud services enable pubsub.googleapis.com
# Etkin olduğunu doğrula
gcloud services list --enabled | grep pubsub
Topic Oluşturma ve Yapılandırma
Topic, mesajların yayınlandığı kanaldır. İyi bir isimlendirme politikası sisteminizi ilerleyen dönemde çok daha yönetilebilir kılar. Genellikle proje-ortam-nesne-eylem formatı işe yarar.
# Basit bir topic oluştur
gcloud pubsub topics create siparis-olusturuldu
# Daha açıklayıcı isimlendirme ile
gcloud pubsub topics create eticaret-prod-siparis-olusturuldu
# Mesaj saklama süresi ile topic oluştur (varsayılan 7 gün, max 31 gün)
gcloud pubsub topics create siparis-olusturuldu
--message-retention-duration=7d
# Mevcut topic'leri listele
gcloud pubsub topics list
# Topic detaylarını görüntüle
gcloud pubsub topics describe siparis-olusturuldu
Mesaj saklama süresi önemli bir parametre. Eğer tüm subscriber’larınız mesajı tükettikten sonra bile mesajların bir süre kalmasını istiyorsanız bunu ayarlayın. Bu özellikle debug ve replay senaryolarında işe yarar.
Subscription Oluşturma
Subscription, topic’ten mesaj okuyan yapıdır. İki ana modu vardır:
Pull: Subscriber aktif olarak mesaj çeker. Batch işleme, kendi hızında tüketme gibi durumlar için ideal.
Push: Pub/Sub, mesajları belirlediğiniz bir HTTP endpoint’e gönderir. Web hook benzeri yapılar, serverless fonksiyonlar için ideal.
# Pull subscription oluştur
gcloud pubsub subscriptions create envanter-sub
--topic=siparis-olusturuldu
# Push subscription oluştur
gcloud pubsub subscriptions create bildirim-sub
--topic=siparis-olusturuldu
--push-endpoint=https://api.sirketim.com/webhook/siparis
# Acknowledgement deadline ile subscription (saniye cinsinden, 10-600 arası)
gcloud pubsub subscriptions create fatura-sub
--topic=siparis-olusturuldu
--ack-deadline=60
# Mesaj saklama süresi ve dead letter policy ile
gcloud pubsub subscriptions create kargo-sub
--topic=siparis-olusturuldu
--ack-deadline=30
--message-retention-duration=2d
--expiration-period=never
# Subscription'ları listele
gcloud pubsub subscriptions list
# Subscription detayı
gcloud pubsub subscriptions describe envanter-sub
--ack-deadline: Subscriber mesajı aldıktan sonra kaç saniye içinde acknowledge etmezse mesaj yeniden teslim edilir. Uzun süren işlemler için bu değeri artırın.
--message-retention-duration: Acknowledge edilmemiş mesajların ne kadar süre tutulacağı.
--expiration-period: Subscription aktif kullanım olmaksızın kaç günde silinsin. never yazarsanız silinmez.
Mesaj Gönderme ve Alma
Komut satırından test etmek için basit örnekler:
# Tek mesaj gönder
gcloud pubsub topics publish siparis-olusturuldu
--message='{"siparis_id": "12345", "musteri_id": "67890", "tutar": 299.99}'
# Mesaj attribute'ları ile gönder (metadata)
gcloud pubsub topics publish siparis-olusturuldu
--message='{"siparis_id": "12346"}'
--attribute=kaynak=web,versiyon=v2,oncelik=yuksek
# Birden fazla mesaj gönder (script ile)
for i in {1..5}; do
gcloud pubsub topics publish siparis-olusturuldu
--message="{"siparis_id": "$i", "test": true}"
echo "Mesaj $i gönderildi"
done
# Pull subscription'dan mesaj çek
gcloud pubsub subscriptions pull envanter-sub --limit=5
# Mesajı otomatik acknowledge ederek çek
gcloud pubsub subscriptions pull envanter-sub
--limit=10
--auto-ack
Gerçek ortamda mesajları CLI ile göndermek yerine uygulama kodundan gönderirsiniz. Python ile örnek:
#!/usr/bin/env python3
# publisher.py - Sipariş olayı yayınlama servisi
import json
import time
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
PROJECT_ID = "proje-id-buraya"
TOPIC_ID = "siparis-olusturuldu"
def siparis_yayinla(siparis_id: str, musteri_id: str, tutar: float):
"""Yeni sipariş olayını Pub/Sub topic'ine yayınlar."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
mesaj_data = {
"siparis_id": siparis_id,
"musteri_id": musteri_id,
"tutar": tutar,
"zaman_damgasi": int(time.time())
}
# Mesajı JSON string'e çevir ve byte'a encode et
data = json.dumps(mesaj_data).encode("utf-8")
# Attribute'lar ile birlikte gönder
gelecek = publisher.publish(
topic_path,
data,
kaynak="web-app",
versiyon="v2",
ortam="production"
)
# Yayınlama sonucunu bekle
mesaj_id = gelecek.result(timeout=30)
print(f"Mesaj yayınlandı. ID: {mesaj_id}")
return mesaj_id
def toplu_siparis_yayinla(siparisler: list):
"""Birden fazla siparişi batch olarak yayınlar."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
# Batch ayarları - performansı optimize eder
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=100, # En fazla 100 mesaj bekle
max_bytes=1024 * 1024, # 1MB dolana kadar bekle
max_latency=0.1, # 100ms bekle
)
publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)
futures = []
for siparis in siparisler:
data = json.dumps(siparis).encode("utf-8")
future = publisher.publish(topic_path, data)
futures.append(future)
# Tüm mesajların gönderilmesini bekle
for i, future in enumerate(futures):
try:
mesaj_id = future.result(timeout=30)
print(f"Sipariş {i+1} yayınlandı: {mesaj_id}")
except Exception as e:
print(f"Hata: Sipariş {i+1} gönderilemedi - {e}")
if __name__ == "__main__":
# Tek sipariş testi
siparis_yayinla("SIP-001", "MUS-456", 599.99)
# Toplu test
test_siparisler = [
{"siparis_id": f"SIP-{i:03d}", "tutar": i * 100}
for i in range(1, 11)
]
toplu_siparis_yayinla(test_siparisler)
Subscriber tarafı:
#!/usr/bin/env python3
# subscriber.py - Envanter güncelleme servisi
import json
import time
from google.cloud import pubsub_v1
PROJECT_ID = "proje-id-buraya"
SUBSCRIPTION_ID = "envanter-sub"
def mesaj_isle(message: pubsub_v1.subscriber.message.Message) -> None:
"""Gelen sipariş mesajını işle ve envanter güncelle."""
try:
# Mesaj verisini decode et
data = json.loads(message.data.decode("utf-8"))
attributes = dict(message.attributes)
print(f"Mesaj alındı:")
print(f" ID: {message.message_id}")
print(f" Sipariş ID: {data.get('siparis_id')}")
print(f" Tutar: {data.get('tutar')}")
print(f" Attribute'lar: {attributes}")
print(f" Yayın zamanı: {message.publish_time}")
# İş mantığı burada çalışır
envanter_guncelle(data["siparis_id"])
# Başarıyla işlendiyse acknowledge et
message.ack()
print(f"Mesaj {message.message_id} başarıyla işlendi.")
except KeyError as e:
print(f"Eksik alan: {e} - Mesaj reddediliyor")
# Mesajı nack'le, yeniden teslim edilsin
message.nack()
except Exception as e:
print(f"İşleme hatası: {e}")
message.nack()
def envanter_guncelle(siparis_id: str):
"""Envanter güncelleme simülasyonu."""
time.sleep(0.1) # DB işlemi simülasyonu
print(f"Envanter güncellendi: {siparis_id}")
def abonelik_baslat():
"""Subscription'ı dinlemeye başla."""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
PROJECT_ID, SUBSCRIPTION_ID
)
# Flow control ayarları - aynı anda kaç mesaj işlensin
flow_control = pubsub_v1.types.FlowControl(
max_messages=10, # Aynı anda en fazla 10 mesaj
max_bytes=10 * 1024 * 1024 # En fazla 10MB
)
print(f"Subscription dinleniyor: {subscription_path}")
streaming_pull_future = subscriber.subscribe(
subscription_path,
callback=mesaj_isle,
flow_control=flow_control
)
# Sürekli dinle
with subscriber:
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
print("Abonelik durduruldu.")
if __name__ == "__main__":
abonelik_baslat()
Dead Letter Topic Kurulumu
Bazı mesajlar işlenemez duruma gelir. Subscriber sürekli hata verir, mesaj formatı bozuktur ya da iş mantığında beklenmedik bir durum oluşur. Bu mesajların kaybolmaması için Dead Letter Topic kullanılır.
# Dead letter topic oluştur
gcloud pubsub topics create siparis-dead-letter
# Dead letter subscription (izleme için)
gcloud pubsub subscriptions create siparis-dead-letter-izleme
--topic=siparis-dead-letter
# Pub/Sub servis hesabına gerekli izinleri ver
PROJECT_NUMBER=$(gcloud projects describe $(gcloud config get-value project)
--format="value(projectNumber)")
PUBSUB_SA="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
# Publisher izni (dead letter topic'e yazabilsin)
gcloud pubsub topics add-iam-policy-binding siparis-dead-letter
--member="$PUBSUB_SA"
--role="roles/pubsub.publisher"
# Subscriber izni (orijinal subscription'dan okuyabilsin)
gcloud pubsub subscriptions add-iam-policy-binding envanter-sub
--member="$PUBSUB_SA"
--role="roles/pubsub.subscriber"
# Mevcut subscription'a dead letter policy ekle
# Önce subscription'ı sil ve yeniden oluştur
gcloud pubsub subscriptions delete envanter-sub
gcloud pubsub subscriptions create envanter-sub
--topic=siparis-olusturuldu
--ack-deadline=60
--dead-letter-topic=siparis-dead-letter
--max-delivery-attempts=5
# Dead letter topic'i izle
gcloud pubsub subscriptions pull siparis-dead-letter-izleme
--limit=10
--auto-ack
--max-delivery-attempts: Mesaj kaç kez teslim denemesi yapılsın. Bu sayıyı aştıktan sonra dead letter topic’e taşınır.
IAM ve Güvenlik Yapılandırması
Production ortamında en az yetki prensibini uygulamak kritik önem taşır.
# Servis hesabı oluştur
gcloud iam service-accounts create siparis-publisher
--display-name="Sipariş Publisher Servisi"
gcloud iam service-accounts create envanter-subscriber
--display-name="Envanter Subscriber Servisi"
# Publisher'a sadece publish yetkisi ver
gcloud pubsub topics add-iam-policy-binding siparis-olusturuldu
--member="serviceAccount:siparis-publisher@$(gcloud config get-value project).iam.gserviceaccount.com"
--role="roles/pubsub.publisher"
# Subscriber'a sadece consume yetkisi ver
gcloud pubsub subscriptions add-iam-policy-binding envanter-sub
--member="serviceAccount:envanter-subscriber@$(gcloud config get-value project).iam.gserviceaccount.com"
--role="roles/pubsub.subscriber"
# Servis hesabı anahtarı oluştur (GKE dışında kullanım için)
gcloud iam service-accounts keys create envanter-sub-key.json
--iam-account=envanter-subscriber@$(gcloud config get-value project).iam.gserviceaccount.com
# Mevcut IAM politikasını görüntüle
gcloud pubsub topics get-iam-policy siparis-olusturuldu
GKE üzerinde çalışıyorsanız Workload Identity kullanın, servis hesabı anahtarı dosyası taşımayın. Bu çok daha güvenli bir yaklaşımdır.
Monitoring ve Alerting
Mesajlaşma sisteminin sağlıklı çalışıp çalışmadığını izlemek için Cloud Monitoring entegrasyonunu kurun.
# Pub/Sub metriklerini kontrol et
gcloud monitoring metrics list --filter="metric.type:pubsub"
# Subscription backlog'unu kontrol et (kaç mesaj bekliyor)
gcloud pubsub subscriptions describe envanter-sub
--format="value(pushConfig,ackDeadlineSeconds)"
# Alert policy oluştur - yüksek backlog için
gcloud alpha monitoring policies create
--policy-from-file=pubsub-alert-policy.json
Alert policy için JSON dosyası:
# pubsub-alert-policy.json oluştur
cat > pubsub-alert-policy.json << 'EOF'
{
"displayName": "Pub/Sub Yüksek Backlog Alarmı",
"conditions": [
{
"displayName": "Subscription backlog 1000 mesajı aştı",
"conditionThreshold": {
"filter": "resource.type="pubsub_subscription" AND metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages" AND resource.labels.subscription_id="envanter-sub"",
"comparison": "COMPARISON_GT",
"thresholdValue": 1000,
"duration": "300s",
"aggregations": [
{
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN"
}
]
}
}
],
"alertStrategy": {
"autoClose": "604800s"
},
"combiner": "OR",
"enabled": true,
"notificationChannels": []
}
EOF
echo "Alert policy dosyası oluşturuldu."
Düzenli olarak kontrol etmeniz gereken metrikler:
subscription/num_undelivered_messages: Backlog büyüklüğü. Bu sayı sürekli artıyorsa subscriber yetişemiyor demektir.subscription/oldest_unacked_message_age: En eski acknowledge edilmemiş mesajın yaşı.topic/send_message_operation_count: Saniyede gönderilen mesaj sayısı.subscription/dead_letter_message_count: Dead letter topic’e düşen mesaj sayısı.
Gerçek Dünya Senaryosu: Sipariş İşleme Pipeline’ı
Tüm bu bileşenleri bir araya getiren kapsamlı bir kurulum senaryosu:
#!/bin/bash
# setup-messaging.sh - Sipariş işleme mesajlaşma altyapısı kurulumu
set -e
PROJECT=$(gcloud config get-value project)
REGION="europe-west1"
echo "Proje: $PROJECT"
echo "Bölge: $REGION"
# API'yi etkinleştir
gcloud services enable pubsub.googleapis.com
# Topic'leri oluştur
echo "Topic'ler oluşturuluyor..."
TOPICS=("siparis-olusturuldu" "siparis-onaylandi" "kargo-guncellendi" "dead-letter")
for topic in "${TOPICS[@]}"; do
gcloud pubsub topics create "$topic"
--message-retention-duration=7d 2>/dev/null || echo "$topic zaten var"
done
# Subscription'ları oluştur
echo "Subscription'lar oluşturuluyor..."
gcloud pubsub subscriptions create envanter-sub
--topic=siparis-olusturuldu
--ack-deadline=60
--dead-letter-topic=dead-letter
--max-delivery-attempts=5
--message-retention-duration=2d 2>/dev/null || echo "envanter-sub zaten var"
gcloud pubsub subscriptions create fatura-sub
--topic=siparis-olusturuldu
--ack-deadline=120
--dead-letter-topic=dead-letter
--max-delivery-attempts=5 2>/dev/null || echo "fatura-sub zaten var"
gcloud pubsub subscriptions create kargo-sub
--topic=siparis-onaylandi
--ack-deadline=30
--dead-letter-topic=dead-letter
--max-delivery-attempts=3 2>/dev/null || echo "kargo-sub zaten var"
gcloud pubsub subscriptions create dead-letter-izleme
--topic=dead-letter
--ack-deadline=600 2>/dev/null || echo "dead-letter-izleme zaten var"
# Servis hesapları oluştur
echo "Servis hesapları yapılandırılıyor..."
SA_PUBLISHER="siparis-api@${PROJECT}.iam.gserviceaccount.com"
SA_SUBSCRIBER="islem-servisleri@${PROJECT}.iam.gserviceaccount.com"
gcloud iam service-accounts create siparis-api
--display-name="Sipariş API Servisi" 2>/dev/null || true
gcloud iam service-accounts create islem-servisleri
--display-name="İşlem Servisleri" 2>/dev/null || true
# İzinleri ver
gcloud pubsub topics add-iam-policy-binding siparis-olusturuldu
--member="serviceAccount:${SA_PUBLISHER}"
--role="roles/pubsub.publisher"
for sub in envanter-sub fatura-sub kargo-sub; do
gcloud pubsub subscriptions add-iam-policy-binding "$sub"
--member="serviceAccount:${SA_SUBSCRIBER}"
--role="roles/pubsub.subscriber"
done
echo ""
echo "Kurulum tamamlandı!"
echo "Topic'ler: $(gcloud pubsub topics list --format='value(name)' | tr 'n' ', ')"
echo "Subscription'lar: $(gcloud pubsub subscriptions list --format='value(name)' | tr 'n' ', ')"
Maliyet Optimizasyonu
Pub/Sub maliyeti gönderilen ve alınan veri miktarına göre hesaplanır. Birkaç pratik tavsiye:
- Mesajları birleştirin: Tek tek küçük mesajlar göndermek yerine batch kullanın. Hem maliyet düşer hem de throughput artar.
- Gereksiz subscription silişlerim: Kullanılmayan subscription’lar hem maliyet oluşturur hem de backlog biriktirir.
- Mesaj boyutunu optimize edin: JSON yerine Protocol Buffers kullanmak mesaj boyutunu önemli ölçüde küçültür.
- Ordering gereksizse kullanmayın: Message ordering açık olmak gereksiz maliyet getirir.
- Snapshot ve replay dikkatli kullanın: Snapshot alma maliyeti etkileyebilir, gerçekten gerekmedikçe oluşturmayın.
Ücretsiz katman oldukça cömert: ayda ilk 10 GB veri ücretsizdir. Orta ölçekli sistemlerde aylık maliyet genellikle birkaç dolarda kalır.
Sonuç
GCP Pub/Sub, karmaşık dağıtık sistemlerdeki servisler arası iletişim sorununu zarif bir şekilde çözer. Kurulum açısından bakıldığında en önemli adımlar topic ve subscription yapısını doğru tasarlamak, dead letter topic’i mutlaka yapılandırmak, IAM izinlerini en az yetki prensibiyle uygulamak ve backlog metriklerini düzenli izlemektir.
Gerçek bir sistemde başlangıçta her şeyi mükemmel yapılandırmanıza gerek yok. Önce basit bir topic ve pull subscription ile başlayın, sistemin davranışını gözlemleyin, ardından ack deadline ve flow control gibi parametreleri yükünüze göre ayarlayın. Dead letter topic’i ise prodüksiyona çıkmadan mutlaka ekleyin, bu sizi pek çok veri kaybı krizinden kurtarır.
Pub/Sub’ı Dataflow veya Cloud Functions ile birleştirdiğinizde stream processing pipeline’ları kurabilirsiniz. Bu entegrasyonları sonraki yazılarda ele alacağız.
