GCP Cloud Functions ile Pub/Sub Entegrasyonu: Olay Güdümlü Mimari Rehberi

Serverless dünyasına adım attığınızda, ilk başta her şey sihir gibi görünür. Kod yazarsın, deploy edersin, çalışır. Ama işin içine girince, özellikle event-driven sistemler kurmaya başlayınca, Pub/Sub gibi mesaj kuyruğu servisleriyle entegrasyon kaçınılmaz hale gelir. GCP’de Cloud Functions ile Pub/Sub’ı birleştirmek, hem güçlü hem de bazen sinir bozucu olabilen bir deneyim. Bu yazıda gerçek dünyada karşılaşacağın senaryolar üzerinden gidip, sık yapılan hataları da atlayarak konuyu derinlemesine ele alacağız.

Pub/Sub Nedir ve Neden Cloud Functions ile Kullanırız?

Google Cloud Pub/Sub, publisher-subscriber modelinde çalışan, tamamen yönetilen bir mesajlaşma servisi. Temel mantık şu: bir uygulama mesaj yayınlar (publish), başka bir uygulama o mesajı dinler (subscribe). Arada ne kadar trafik olursa olsun, Pub/Sub bunu halleder.

Cloud Functions ile bu ikiliyi kullanmanın asıl güzelliği şu: bir tetikleyici geldiğinde fonksiyonun uyanması, işini yapması ve kaybolması. Sürekli çalışan bir servis yok, idle maliyeti yok. E-ticaret sitenizde her sipariş oluştuğunda fatura oluşturman, bir log stream’ini işlemen, veya IoT cihazlarından gelen veriyi parçalayıp farklı servislere iletmen gerektiğinde bu kombinasyon biçilmiş kaftan.

Tipik kullanım senaryoları şunlar:

  • Asenkron iş akışları: Kullanıcı kayıt olduğunda e-posta gönder, profil oluştur, audit log yaz… Bunların hepsini senkron yapmak yerine Pub/Sub’a mesaj at, her adım kendi fonksiyonunda çalışsın.
  • Mikroservis iletişimi: Servisler birbirini direkt çağırmak yerine Pub/Sub üzerinden haberleşir. Bir servis çökse bile mesajlar kaybolmaz.
  • Log ve metrik toplama: Farklı sistemlerden gelen logları merkezi bir yerde toplamak için mükemmel.
  • Batch işlemlerin tetiklenmesi: Cloud Scheduler ile Pub/Sub’a mesaj gönder, fonksiyon uyansın ve işlemi başlatsın.

Ortamı Hazırlama

Önce GCP projenizde gerekli API’leri aktifleştirmemiz gerekiyor. Bu adımı atlayanlar sonradan saatlerce hata ayıklar.

# Gerekli API'leri aktifleştir
gcloud services enable cloudfunctions.googleapis.com
gcloud services enable pubsub.googleapis.com
gcloud services enable cloudbuild.googleapis.com
gcloud services enable artifactregistry.googleapis.com

# Proje ID'ni ortam değişkenine ata
export PROJECT_ID=$(gcloud config get-value project)
export REGION="europe-west1"

Şimdi Pub/Sub topic ve subscription oluşturalım:

# Topic oluştur
gcloud pubsub topics create siparis-olaylari

# Subscription oluştur (pull için)
gcloud pubsub subscriptions create siparis-olaylari-sub 
  --topic=siparis-olaylari 
  --ack-deadline=60 
  --message-retention-duration=7d

# Topic'i listele, doğru oluştu mu kontrol et
gcloud pubsub topics list

--ack-deadline parametresi kritik. Fonksiyonun mesajı işlemesi için kaç saniyesi var? 60 saniye genelde yeterli ama ağır işlemler için bunu artırman gerekebilir. --message-retention-duration ise işlenemeyen mesajların ne kadar süre tutulacağını belirliyor, 7 gün makul bir değer.

İlk Cloud Function: Pub/Sub Tetiklemeli

Şimdi gerçek işe gelelim. Python 3.11 ile basit bir Cloud Function yazalım. Bu fonksiyon Pub/Sub’dan mesaj aldığında sipariş verilerini işleyecek.

Proje yapısı şöyle olacak:

mkdir siparis-processor && cd siparis-processor
touch main.py requirements.txt

requirements.txt dosyası:

cat > requirements.txt << 'EOF'
google-cloud-pubsub==2.18.4
google-cloud-firestore==2.13.1
functions-framework==3.4.0
EOF

Şimdi main.py dosyasını yazalım:

cat > main.py << 'EOF'
import base64
import json
import logging
import functions_framework
from google.cloud import firestore
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@functions_framework.cloud_event
def siparis_isle(cloud_event):
    """Pub/Sub'dan gelen sipariş olayını işler."""
    
    # Pub/Sub mesajını decode et
    pubsub_message = cloud_event.data["message"]
    
    if "data" not in pubsub_message:
        logger.error("Mesajda 'data' alanı bulunamadı")
        return
    
    # Base64 decode işlemi
    try:
        mesaj_verisi = base64.b64decode(pubsub_message["data"]).decode("utf-8")
        siparis = json.loads(mesaj_verisi)
    except (ValueError, json.JSONDecodeError) as e:
        logger.error(f"Mesaj decode hatası: {e}")
        # Hatalı mesajları acknowledge et, yoksa sonsuz döngüye girer
        return
    
    logger.info(f"Sipariş alındı: {siparis.get('siparis_id', 'bilinmiyor')}")
    
    # Sipariş verilerini doğrula
    gerekli_alanlar = ["siparis_id", "musteri_id", "toplam_tutar", "urunler"]
    for alan in gerekli_alanlar:
        if alan not in siparis:
            logger.error(f"Eksik alan: {alan}")
            return
    
    # Firestore'a kaydet
    try:
        db = firestore.Client()
        doc_ref = db.collection("siparisler").document(siparis["siparis_id"])
        doc_ref.set({
            **siparis,
            "isleme_zamani": datetime.utcnow().isoformat(),
            "durum": "islendi"
        })
        logger.info(f"Sipariş Firestore'a kaydedildi: {siparis['siparis_id']}")
    except Exception as e:
        logger.error(f"Firestore kayıt hatası: {e}")
        # Burada raise yaparsak Pub/Sub mesajı retry eder
        raise
EOF

Fonksiyonu deploy edelim:

gcloud functions deploy siparis-isle 
  --gen2 
  --runtime=python311 
  --region=$REGION 
  --source=. 
  --entry-point=siparis_isle 
  --trigger-topic=siparis-olaylari 
  --memory=256MB 
  --timeout=120s 
  --min-instances=0 
  --max-instances=10 
  --set-env-vars=ENVIRONMENT=production

--gen2 parametresini mutlaka kullanın. Cloud Functions 2. nesil, Cloud Run üzerinde çalışır ve çok daha iyi performans ile concurrency desteği sunar.

Test Mesajı Gönderme

Fonksiyonu test etmek için Pub/Sub’a mesaj gönderelim:

# Test sipariş verisi oluştur
SIPARIS_VERISI=$(echo '{
  "siparis_id": "SPR-2024-001",
  "musteri_id": "MST-456",
  "toplam_tutar": 299.99,
  "para_birimi": "TRY",
  "urunler": [
    {"urun_id": "U001", "adet": 2, "fiyat": 149.99}
  ]
}' | base64)

# Pub/Sub'a mesaj gönder
gcloud pubsub topics publish siparis-olaylari 
  --message="$SIPARIS_VERISI" 
  --attribute="kaynak=test,oncelik=yuksek"

# Fonksiyon loglarını takip et
gcloud functions logs read siparis-isle 
  --region=$REGION 
  --gen2 
  --limit=50

Logları izlerken birkaç dakika beklemeniz gerekebilir. Cloud Functions cold start süresi, özellikle min-instances 0 ise, ilk çağrıda birkaç saniye alabilir.

Dead Letter Topic Kurulumu

Gerçek prodüksiyon ortamında işlenemeyen mesajlar için mutlaka Dead Letter Topic (DLT) kurmalısınız. Aksi halde başarısız mesajlar sonsuza kadar retry eder ve fonksiyonunuz sürekli hata üretir.

# Dead letter topic oluştur
gcloud pubsub topics create siparis-olaylari-dlq

# Mevcut subscription'ı dead letter ile güncelle
gcloud pubsub subscriptions modify-push-config siparis-olaylari-sub 
  --dead-letter-topic=siparis-olaylari-dlq 
  --max-delivery-attempts=5

# DLQ için monitoring subscription
gcloud pubsub subscriptions create siparis-dlq-monitor 
  --topic=siparis-olaylari-dlq 
  --message-retention-duration=14d

# Pub/Sub'ın DLQ'ya yazabilmesi için IAM izni ver
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format='value(projectNumber)')
PUBSUB_SA="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub topics add-iam-policy-binding siparis-olaylari-dlq 
  --member="serviceAccount:${PUBSUB_SA}" 
  --role="roles/pubsub.publisher"

gcloud pubsub subscriptions add-iam-policy-binding siparis-olaylari-sub 
  --member="serviceAccount:${PUBSUB_SA}" 
  --role="roles/pubsub.subscriber"

5 denemeden sonra hala işlenemeyen mesajlar DLQ’ya düşer. Bu mesajları ayrı bir fonksiyon veya manuel süreçle işleyebilirsiniz.

Gerçek Dünya Senaryosu: Çoklu Topic ile Fan-Out Pattern

E-ticaret senaryosunu genişletelim. Bir sipariş geldiğinde birden fazla işlem yapılması gerekiyor: fatura oluştur, kargo bildir, müşteriye e-posta gönder, stok güncelle. Bunları tek bir fonksiyonda yapmak hem yönetilmesi zor hem de hata yönetimi kabusa dönüyor.

Fan-out pattern’de ana topic’e gelen mesaj, birden fazla fonksiyonu tetikler. Pub/Sub’da birden fazla subscription aynı topic’i dinleyebilir.

# Her işlem için ayrı subscription oluştur
gcloud pubsub subscriptions create fatura-sub 
  --topic=siparis-olaylari 
  --ack-deadline=60

gcloud pubsub subscriptions create kargo-sub 
  --topic=siparis-olaylari 
  --ack-deadline=60

gcloud pubsub subscriptions create email-sub 
  --topic=siparis-olaylari 
  --ack-deadline=60

# Fatura fonksiyonunu deploy et
gcloud functions deploy fatura-olustur 
  --gen2 
  --runtime=python311 
  --region=$REGION 
  --source=./fatura-service 
  --entry-point=fatura_olustur 
  --trigger-topic=siparis-olaylari 
  --memory=512MB 
  --timeout=300s

# Kargo bildirim fonksiyonu
gcloud functions deploy kargo-bildir 
  --gen2 
  --runtime=python311 
  --region=$REGION 
  --source=./kargo-service 
  --entry-point=kargo_bildir 
  --trigger-topic=siparis-olaylari 
  --memory=256MB 
  --timeout=120s

Bu yapıda dikkat edilmesi gereken nokta şu: her fonksiyon bağımsız başarısız olabilir ve kendi retry mekanizması vardır. Fatura oluşturma başarısız olsa bile kargo bildirimi etkilenmez.

Attribute Bazlı Filtreleme

Tüm fonksiyonların her mesajı işlemesini istemiyorsunuz. Sadece belirli koşullardaki siparişleri filtreleyelim. Pub/Sub subscription filter özelliği tam burada devreye giriyor.

# Sadece "express" kargo tipindeki siparişler için subscription
gcloud pubsub subscriptions create express-kargo-sub 
  --topic=siparis-olaylari 
  --filter='attributes.kargo_tipi = "express"' 
  --ack-deadline=60 
  --message-retention-duration=1d

# Express kargo fonksiyonu bu subscription'ı dinleyecek
# Mesaj gönderirken attribute ekle
gcloud pubsub topics publish siparis-olaylari 
  --message="$(echo '{"siparis_id":"SPR-002","acil":true}' | base64)" 
  --attribute="kargo_tipi=express,oncelik=yuksek"

Filtreleme Pub/Sub tarafında yapıldığı için fonksiyonunuz gereksiz mesajları hiç almaz, bu da maliyeti ve işlem yükünü düşürür.

Monitoring ve Alerting

Prodüksiyonda bir şeyler çalışıyor diye inanmak yetmez, bunu izlemeniz lazım. Pub/Sub ve Cloud Functions için kritik metrikler şunlar:

  • subscription/num_undelivered_messages: İşlenmeyi bekleyen mesaj sayısı. Bu artmaya başlarsa fonksiyonunuz ayak uyduramıyor demektir.
  • subscription/oldest_unacked_message_age: En eski işlenmemiş mesajın yaşı. Bu 5 dakikayı geçiyorsa alarm kurulmalı.
  • function/execution_count: Fonksiyon çalışma sayısı.
  • function/execution_times: Çalışma süreleri.
# Metrik bazlı alert policy oluştur
gcloud alpha monitoring policies create 
  --notification-channels="projects/$PROJECT_ID/notificationChannels/KANAL_ID" 
  --display-name="Pub/Sub Mesaj Birikimi Alarmı" 
  --condition-display-name="Unacked mesaj sayısı 1000 üstü" 
  --condition-filter='resource.type="pubsub_subscription" AND resource.labels.subscription_id="siparis-olaylari-sub" AND metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages"' 
  --condition-threshold-value=1000 
  --condition-threshold-comparison=COMPARISON_GT 
  --condition-duration=300s

# Fonksiyon hata oranını izle
gcloud logging metrics create cloud-function-hatalari 
  --description="Cloud Function hata sayacı" 
  --log-filter='resource.type="cloud_function" AND severity>=ERROR AND resource.labels.function_name="siparis-isle"'

Idempotency: Aynı Mesajı İki Kez İşleme Sorunu

Bu konu sysadmin bloglarında çok az ele alınır ama prodüksiyonda başınızı en çok ağrıtacak konudur. Pub/Sub at-least-once delivery garantisi verir. Yani aynı mesaj nadiren de olsa iki kez gelebilir. Fonksiyonunuz idempotent değilse, aynı siparişten iki fatura oluşturabilirsiniz.

Çözüm basit ama disiplin ister:

cat > main.py << 'EOF'
import base64
import json
import logging
import functions_framework
from google.cloud import firestore
from datetime import datetime

logger = logging.getLogger(__name__)

@functions_framework.cloud_event
def siparis_isle(cloud_event):
    pubsub_message = cloud_event.data["message"]
    mesaj_id = pubsub_message.get("messageId", "bilinmiyor")
    
    try:
        mesaj_verisi = base64.b64decode(pubsub_message["data"]).decode("utf-8")
        siparis = json.loads(mesaj_verisi)
    except (ValueError, json.JSONDecodeError) as e:
        logger.error(f"Decode hatası: {e}")
        return
    
    db = firestore.Client()
    
    # Idempotency kontrolü: Bu mesaj daha önce işlendi mi?
    isleme_ref = db.collection("islenen_mesajlar").document(mesaj_id)
    
    @firestore.transactional
    def idempotent_isle(transaction):
        snapshot = isleme_ref.get(transaction=transaction)
        
        if snapshot.exists:
            logger.info(f"Mesaj zaten işlenmiş: {mesaj_id}, atlanıyor")
            return False
        
        # Hem mesajı işlenmiş olarak işaretle hem de siparişi kaydet
        siparis_ref = db.collection("siparisler").document(siparis["siparis_id"])
        transaction.set(isleme_ref, {
            "isleme_zamani": datetime.utcnow().isoformat(),
            "siparis_id": siparis["siparis_id"]
        })
        transaction.set(siparis_ref, {
            **siparis,
            "isleme_zamani": datetime.utcnow().isoformat(),
            "durum": "islendi"
        })
        return True
    
    transaction = db.transaction()
    sonuc = idempotent_isle(transaction)
    
    if sonuc:
        logger.info(f"Sipariş başarıyla işlendi: {siparis['siparis_id']}")
EOF

Firestore transaction kullanarak atomik bir şekilde hem mesajı işlenmiş olarak işaretliyoruz hem de asıl işlemi yapıyoruz. İki eşzamanlı fonksiyon instance’ı aynı mesajı almaya çalışsa bile biri kazanacak ve diğeri güvenli şekilde atlayacak.

Terraform ile Altyapıyı Kod Olarak Yönetmek

Elle gcloud komutları çalıştırmak başlangıç için iyi, ama prodüksiyonda her şey Infrastructure as Code olmalı. Minimal bir Terraform örneği:

cat > main.tf << 'EOF'
resource "google_pubsub_topic" "siparis_olaylari" {
  name    = "siparis-olaylari"
  project = var.project_id

  message_storage_policy {
    allowed_persistence_regions = ["europe-west1"]
  }
}

resource "google_pubsub_topic" "siparis_dlq" {
  name    = "siparis-olaylari-dlq"
  project = var.project_id
}

resource "google_pubsub_subscription" "siparis_sub" {
  name    = "siparis-olaylari-sub"
  topic   = google_pubsub_topic.siparis_olaylari.name
  project = var.project_id

  ack_deadline_seconds       = 60
  message_retention_duration = "604800s"  # 7 gün

  dead_letter_policy {
    dead_letter_topic     = google_pubsub_topic.siparis_dlq.id
    max_delivery_attempts = 5
  }

  retry_policy {
    minimum_backoff = "10s"
    maximum_backoff = "600s"
  }
}

resource "google_cloudfunctions2_function" "siparis_isle" {
  name     = "siparis-isle"
  location = var.region
  project  = var.project_id

  build_config {
    runtime     = "python311"
    entry_point = "siparis_isle"
    source {
      storage_source {
        bucket = google_storage_bucket.functions_bucket.name
        object = google_storage_bucket_object.function_source.name
      }
    }
  }

  service_config {
    min_instance_count = 0
    max_instance_count = 10
    available_memory   = "256M"
    timeout_seconds    = 120
  }

  event_trigger {
    trigger_region        = var.region
    event_type            = "google.cloud.pubsub.topic.v1.messagePublished"
    pubsub_topic          = google_pubsub_topic.siparis_olaylari.id
    retry_policy          = "RETRY_POLICY_RETRY"
  }
}
EOF
# Terraform ile deploy
terraform init
terraform plan -var="project_id=$PROJECT_ID" -var="region=$REGION"
terraform apply -var="project_id=$PROJECT_ID" -var="region=$REGION" -auto-approve

Sık Karşılaşılan Hatalar ve Çözümleri

“Permission denied on Pub/Sub” hatası: Cloud Functions servis hesabının Pub/Sub’a erişim izni yok demektir.

# Cloud Functions servis hesabına Pub/Sub subscriber rolü ver
gcloud projects add-iam-policy-binding $PROJECT_ID 
  --member="serviceAccount:[email protected]" 
  --role="roles/pubsub.subscriber"

Cold start süresi çok uzun: Python için cold start optimize etmek istiyorsanız global scope’da yapılan işlemleri minimuma indirin. Firestore client’ı her istekte değil, modül yüklenirken oluşturun.

Mesajlar işleniyor ama Firestore’a yazılmıyor: Muhtemelen timeout süresi aşılıyor. Timeout değerini artırın veya işlemi daha küçük parçalara bölün.

Retry storm: Fonksiyon sürekli hata verince Pub/Sub sürekli retry eder. Dead letter topic kurulmamışsa bu döngü sonsuza gidebilir. Her zaman DLQ kurun ve hata fırlatan kodunuzda exponential backoff mantığı olduğundan emin olun.

Sonuç

GCP Cloud Functions ile Pub/Sub entegrasyonu, doğru yapılandırıldığında son derece güvenilir ve ölçeklenebilir sistemler kurmanıza olanak tanıyor. Bu yazıda ele aldığımız konuları özetlersek:

  • Temel kurulum ve topic/subscription yönetimi
  • Cloud Functions 2. nesil ile Pub/Sub event trigger
  • Dead Letter Topic ile hata yönetimi
  • Fan-out pattern ile mikroservis mimarisi
  • Attribute filtreleme ile akıllı mesaj yönlendirme
  • Idempotency ile çift işleme sorununun önlenmesi
  • Terraform ile altyapının kod olarak yönetimi

Prodüksiyona geçmeden önce mutlaka şu üç şeyi yapın: Dead letter topic kurun, idempotency mekanizması ekleyin ve kritik metrikler için alert policy tanımlayın. Bu üçü olmadan serverless güzel görünür ama gece 2’de sizi uyandırabilir.

Sistemin davranışını anlamak için başlangıçta daha küçük --max-instances değerleriyle çalışın ve ölçeklendirmeyi gözlemleyin. Cloud Functions otomatik ölçekler ama her ölçekleme ücretsiz değil, Pub/Sub mesaj hacmine göre maliyetinizi önceden tahmin edin.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir