Webhook ile Asenkron İşlem Kuyruğu Yönetimi

Bir webhook geldiğinde “tamam, aldım” deyip işi hemen bitirmek kulağa hoş gelir. Ama gerçek dünyada işler böyle yürümez. Harici bir servisten gelen webhook isteği, veritabanı yazma, e-posta gönderme, dosya işleme gibi onlarca adımı tetikleyebilir. Tüm bunları webhook handler’ın içinde senkron olarak yapmaya kalkarsan hem timeout alırsın hem de gönderen servise “ben hazır değilim” mesajı vermek zorunda kalırsın. İşte tam bu noktada asenkron işlem kuyruğu devreye giriyor.

Bu yazıda gerçek bir senaryo üzerinden ilerleyeceğiz: Bir e-ticaret platformunun ödeme webhook’larını alması, bu işlemleri kuyruğa atması ve worker’ların kuyruğu işlemesi. Teknoloji olarak Redis tabanlı Celery ve RabbitMQ kombinasyonunu kullanacağız. Arka planda ne döndüğünü anlamak isteyenler için mimariyi adım adım açıklayacağım.

Neden Asenkron Kuyruk?

Webhook gönderen servisler genellikle çok sabırsızdır. Stripe, GitHub, Shopify gibi platformlar webhook endpoint’ine istek gönderir ve birkaç saniye içinde 200 OK bekler. Eğer bu sürede cevap alamazlarsa isteği başarısız sayar ve yeniden dener. Bu yeniden deneme mekanizması bazen 3-4 kez, bazen onlarca kez tekrar edebilir.

Problemi şöyle düşün: Bir ödeme webhook’u geldi. Sen bu webhook içinde müşteriye e-posta göndermeye, stok düşmeye, muhasebe sistemine kayıt atmaya çalışıyorsun. Bu işlemlerin toplamı 8-10 saniye sürüyor. Stripe ise 5 saniye sonra “cevap gelmedi” deyip aynı webhook’u tekrar gönderiyor. Şimdi elinde çift işlenmiş bir ödeme var. Felaket.

Çözüm şu: Webhook geldiğinde sadece veriyi al, kuyruğa at, 200 OK döndür. Worker’lar kuyruktaki işleri asenkron olarak yapsın.

Mimari Genel Bakış

Sistemimizin bileşenleri şöyle:

  • Nginx: Reverse proxy, SSL termination
  • Flask/FastAPI: Webhook endpoint’i, gelen isteği doğrular ve kuyruğa atar
  • Redis veya RabbitMQ: Mesaj kuyruğu
  • Celery Worker: Kuyruktaki işleri işleyen arka plan süreci
  • Flower: Celery monitoring arayüzü
  • PostgreSQL: İşlem logları ve durum takibi

Ortamı Kurma

Önce gerekli paketleri kuralım:

# Python sanal ortamı
python3 -m venv webhook_env
source webhook_env/bin/activate

# Gerekli paketler
pip install flask celery redis sqlalchemy psycopg2-binary 
    flower requests cryptography gunicorn

# Redis kurulumu (Ubuntu/Debian)
apt-get install -y redis-server
systemctl enable redis-server
systemctl start redis-server

# RabbitMQ kurulumu (alternatif broker)
apt-get install -y rabbitmq-server
systemctl enable rabbitmq-server
systemctl start rabbitmq-server

# RabbitMQ yönetim plugin'i
rabbitmq-plugins enable rabbitmq_management

Proje dizin yapısı şöyle olacak:

mkdir -p webhook_system/{app,workers,config,logs}
cd webhook_system
touch app/__init__.py app/webhook.py app/models.py
touch workers/tasks.py config/settings.py
touch celery_app.py docker-compose.yml

Celery Uygulamasını Yapılandırma

# celery_app.py
from celery import Celery
import os

def make_celery():
    app = Celery(
        'webhook_system',
        broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
        backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'),
        include=['workers.tasks']
    )
    
    app.conf.update(
        # Task ayarları
        task_serializer='json',
        result_serializer='json',
        accept_content=['json'],
        
        # Timezone
        timezone='Europe/Istanbul',
        enable_utc=True,
        
        # Retry ayarları
        task_acks_late=True,
        task_reject_on_worker_lost=True,
        
        # Kuyruk yapılandırması
        task_queues={
            'high_priority': {
                'exchange': 'high_priority',
                'routing_key': 'high_priority',
            },
            'default': {
                'exchange': 'default',
                'routing_key': 'default',
            },
            'low_priority': {
                'exchange': 'low_priority',
                'routing_key': 'low_priority',
            }
        },
        task_default_queue='default',
        
        # Rate limiting
        task_annotations={
            'workers.tasks.process_payment': {
                'rate_limit': '100/m'
            }
        },
        
        # Worker ayarları
        worker_prefetch_multiplier=1,
        worker_max_tasks_per_child=200,
        
        # Sonuç saklama süresi
        result_expires=86400,
    )
    
    return app

celery = make_celery()

Webhook Endpoint’i

Webhook endpoint’inin iki kritik görevi var: imza doğrulama ve kuyruğa atma. İmza doğrulamayı atlarsanız herhangi biri sahte webhook gönderebilir.

# app/webhook.py
from flask import Flask, request, jsonify, abort
import hmac
import hashlib
import json
import logging
from datetime import datetime
from celery_app import celery
from workers.tasks import process_payment, process_order_update, send_notification

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

WEBHOOK_SECRETS = {
    'stripe': 'whsec_xxxxxxxxxxxxx',
    'shopify': 'shopify_secret_xxx',
    'github': 'github_secret_xxx'
}

def verify_stripe_signature(payload, signature, secret):
    """Stripe webhook imza doğrulama"""
    try:
        timestamp = None
        signatures = []
        
        for item in signature.split(','):
            key, value = item.split('=', 1)
            if key == 't':
                timestamp = value
            elif key == 'v1':
                signatures.append(value)
        
        if not timestamp or not signatures:
            return False
        
        signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
        expected = hmac.new(
            secret.encode('utf-8'),
            signed_payload.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        return any(hmac.compare_digest(expected, sig) for sig in signatures)
    except Exception as e:
        logger.error(f"İmza doğrulama hatası: {e}")
        return False

@app.route('/webhook/stripe', methods=['POST'])
def stripe_webhook():
    payload = request.get_data()
    signature = request.headers.get('Stripe-Signature')
    
    if not signature:
        logger.warning("Stripe-Signature header eksik")
        abort(400)
    
    if not verify_stripe_signature(payload, signature, WEBHOOK_SECRETS['stripe']):
        logger.warning("Geçersiz Stripe imzası")
        abort(401)
    
    try:
        event = json.loads(payload)
        event_type = event.get('type')
        event_id = event.get('id')
        
        logger.info(f"Stripe webhook alındı: {event_type} - {event_id}")
        
        # Event tipine göre uygun kuyruğa at
        if event_type == 'payment_intent.succeeded':
            task = process_payment.apply_async(
                args=[event],
                queue='high_priority',
                task_id=f"stripe_{event_id}",
                countdown=0
            )
            logger.info(f"Ödeme işlemi kuyruğa eklendi: {task.id}")
            
        elif event_type == 'invoice.payment_failed':
            task = send_notification.apply_async(
                args=[event, 'payment_failed'],
                queue='high_priority',
                retry=True,
                retry_policy={
                    'max_retries': 3,
                    'interval_start': 0,
                    'interval_step': 60,
                    'interval_max': 300,
                }
            )
            
        elif event_type in ['customer.updated', 'customer.deleted']:
            process_order_update.apply_async(
                args=[event],
                queue='default',
                expires=3600
            )
        
        return jsonify({
            'status': 'queued',
            'event_id': event_id,
            'event_type': event_type,
            'timestamp': datetime.utcnow().isoformat()
        }), 200
        
    except json.JSONDecodeError:
        logger.error("JSON parse hatası")
        abort(400)
    except Exception as e:
        logger.error(f"Webhook işleme hatası: {e}")
        # Yine de 200 dön, yoksa Stripe tekrar dener
        return jsonify({'status': 'error', 'message': str(e)}), 200

Worker Task’ları

Task’ları yazarken en önemli konu idempotency yani aynı işlemi birden fazla kez çalıştırsak bile sonucun değişmemesi. Çünkü kuyruk sistemlerinde bir task birden fazla kez çalışabilir.

# workers/tasks.py
from celery_app import celery
from celery.utils.log import get_task_logger
from datetime import datetime
import requests
import time

logger = get_task_logger(__name__)

@celery.task(
    bind=True,
    name='workers.tasks.process_payment',
    max_retries=3,
    default_retry_delay=60,
    acks_late=True,
    autoretry_for=(requests.Timeout, ConnectionError),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True
)
def process_payment(self, event_data):
    """
    Ödeme işlemini asenkron olarak gerçekleştirir.
    Idempotent: Aynı payment_id ile birden fazla çalışsa sorun olmaz.
    """
    task_id = self.request.id
    payment_id = event_data.get('data', {}).get('object', {}).get('id')
    
    logger.info(f"Ödeme işleniyor: {payment_id} - Task: {task_id}")
    
    try:
        # 1. Idempotency kontrolü - daha önce işlendi mi?
        if is_already_processed(payment_id):
            logger.info(f"Ödeme zaten işlendi, atlıyorum: {payment_id}")
            return {'status': 'skipped', 'reason': 'already_processed'}
        
        # 2. İşlemi başladı olarak işaretle
        mark_processing(payment_id, task_id)
        
        # 3. Ödeme detaylarını al
        payment_details = extract_payment_details(event_data)
        
        # 4. Veritabanına kaydet
        save_payment_to_db(payment_details)
        
        # 5. Stok güncelle
        update_inventory(payment_details['items'])
        
        # 6. Müşteriye onay e-postası gönder
        send_confirmation_email(payment_details['customer_email'])
        
        # 7. Muhasebe sistemine bildir
        notify_accounting_system(payment_details)
        
        # 8. İşlemi tamamlandı olarak işaretle
        mark_completed(payment_id)
        
        logger.info(f"Ödeme başarıyla işlendi: {payment_id}")
        return {
            'status': 'success',
            'payment_id': payment_id,
            'processed_at': datetime.utcnow().isoformat()
        }
        
    except requests.Timeout as exc:
        logger.warning(f"Timeout hatası, yeniden deneniyor: {payment_id}")
        raise self.retry(exc=exc, countdown=self.default_retry_delay * (self.request.retries + 1))
        
    except Exception as exc:
        logger.error(f"Ödeme işleme hatası: {payment_id} - {exc}")
        if self.request.retries >= self.max_retries:
            # Maksimum deneme sayısına ulaşıldı, dead letter queue'ya gönder
            send_to_dead_letter_queue(event_data, str(exc))
            mark_failed(payment_id, str(exc))
        raise

def is_already_processed(payment_id):
    """Redis üzerinde idempotency kontrolü"""
    import redis
    r = redis.Redis(host='localhost', port=6379, db=2)
    return r.exists(f"processed:payment:{payment_id}")

def mark_processing(payment_id, task_id):
    import redis
    r = redis.Redis(host='localhost', port=6379, db=2)
    r.setex(f"processing:payment:{payment_id}", 3600, task_id)

def mark_completed(payment_id):
    import redis
    r = redis.Redis(host='localhost', port=6379, db=2)
    r.delete(f"processing:payment:{payment_id}")
    # 24 saat boyunca tamamlandı olarak işaretle
    r.setex(f"processed:payment:{payment_id}", 86400, "1")

@celery.task(bind=True, name='workers.tasks.send_notification', max_retries=5)
def send_notification(self, event_data, notification_type):
    """Bildirim gönderme task'ı"""
    customer_id = event_data.get('data', {}).get('object', {}).get('customer')
    
    try:
        if notification_type == 'payment_failed':
            # SMS ve e-posta ile bildirim gönder
            customer = get_customer_details(customer_id)
            
            send_sms(customer['phone'], "Ödemeniz başarısız oldu. Lütfen kart bilgilerinizi kontrol edin.")
            send_email(
                customer['email'],
                subject="Ödeme Başarısız",
                template="payment_failed",
                context={'customer': customer}
            )
            
        return {'status': 'sent', 'customer_id': customer_id}
        
    except Exception as exc:
        logger.error(f"Bildirim gönderilemedi: {exc}")
        raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))

Dead Letter Queue Yönetimi

Maksimum deneme sayısını aşan task’ları kaybetmemek için dead letter queue kullanıyoruz:

# workers/dead_letter.py
import json
import redis
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

r = redis.Redis(host='localhost', port=6379, db=3)
DLQ_KEY = 'dead_letter_queue'

def send_to_dead_letter_queue(event_data, error_message, metadata=None):
    """Başarısız işlemi dead letter queue'ya ekler"""
    dlq_entry = {
        'event_data': event_data,
        'error': error_message,
        'failed_at': datetime.utcnow().isoformat(),
        'metadata': metadata or {}
    }
    r.lpush(DLQ_KEY, json.dumps(dlq_entry))
    logger.error(f"İşlem DLQ'ya eklendi: {event_data.get('id', 'unknown')}")

def get_dlq_items(count=10):
    """DLQ'dan item listeler"""
    items = r.lrange(DLQ_KEY, 0, count - 1)
    return [json.loads(item) for item in items]

def retry_dlq_item(index):
    """DLQ'dan bir item'ı yeniden dener"""
    from workers.tasks import process_payment
    
    items = r.lrange(DLQ_KEY, index, index)
    if not items:
        return False
    
    item = json.loads(items[0])
    event_data = item['event_data']
    
    # Yeniden kuyruğa at
    process_payment.apply_async(
        args=[event_data],
        queue='default',
        countdown=0
    )
    
    # DLQ'dan kaldır
    r.lrem(DLQ_KEY, 1, items[0])
    logger.info(f"DLQ item yeniden kuyruğa eklendi")
    return True

def drain_dlq():
    """Tüm DLQ'yu yeniden işler"""
    count = r.llen(DLQ_KEY)
    logger.info(f"DLQ'da {count} item var, yeniden işleniyor...")
    
    for i in range(count):
        retry_dlq_item(0)
        import time
        time.sleep(1)  # Rate limiting
    
    logger.info("DLQ draining tamamlandı")

Worker’ları Başlatma ve Yönetme

# Farklı kuyruklar için farklı worker'lar başlat
# High priority worker - daha fazla concurrency
celery -A celery_app worker 
    --queues=high_priority 
    --concurrency=8 
    --prefetch-multiplier=1 
    --loglevel=info 
    --logfile=logs/worker_high.log 
    --hostname=worker_high@%h 
    --detach

# Default queue worker
celery -A celery_app worker 
    --queues=default 
    --concurrency=4 
    --prefetch-multiplier=1 
    --loglevel=info 
    --logfile=logs/worker_default.log 
    --hostname=worker_default@%h 
    --detach

# Low priority worker
celery -A celery_app worker 
    --queues=low_priority 
    --concurrency=2 
    --loglevel=warning 
    --logfile=logs/worker_low.log 
    --hostname=worker_low@%h 
    --detach

# Flower monitoring başlat
celery -A celery_app flower 
    --port=5555 
    --basic_auth=admin:guclu_sifre 
    --url_prefix=flower &

# Kuyruk durumunu kontrol et
celery -A celery_app inspect active
celery -A celery_app inspect reserved
celery -A celery_app inspect stats

# Belirli bir worker'ı güvenli şekilde kapat
celery -A celery_app control shutdown worker_high@hostname

# Tüm worker'lara görev iptal gönder
celery -A celery_app purge

Systemd ile Servis Yönetimi

# /etc/systemd/system/celery-webhook.service
cat > /etc/systemd/system/celery-webhook.service << 'EOF'
[Unit]
Description=Webhook Celery Worker
After=network.target redis.service

[Service]
Type=forking
User=webhook_user
Group=webhook_user
WorkingDirectory=/opt/webhook_system
Environment=CELERY_BROKER_URL=redis://localhost:6379/0
Environment=CELERY_RESULT_BACKEND=redis://localhost:6379/1

ExecStart=/opt/webhook_system/webhook_env/bin/celery 
    multi start 
    worker_high worker_default worker_low 
    -A celery_app 
    --logfile=/opt/webhook_system/logs/%n.log 
    --pidfile=/var/run/celery/%n.pid 
    -Q:worker_high high_priority 
    -Q:worker_default default 
    -Q:worker_low low_priority 
    --concurrency:worker_high=8 
    --concurrency:worker_default=4 
    --concurrency:worker_low=2

ExecStop=/opt/webhook_system/webhook_env/bin/celery 
    multi stopwait 
    worker_high worker_default worker_low 
    --pidfile=/var/run/celery/%n.pid

ExecReload=/opt/webhook_system/webhook_env/bin/celery 
    multi restart 
    worker_high worker_default worker_low 
    -A celery_app 
    --pidfile=/var/run/celery/%n.pid

RuntimeDirectory=celery
RuntimeDirectoryMode=0755

Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
EOF

mkdir -p /var/run/celery
chown webhook_user:webhook_user /var/run/celery

systemctl daemon-reload
systemctl enable celery-webhook
systemctl start celery-webhook
systemctl status celery-webhook

Monitoring ve Alerting

Kuyruk yönetiminde monitoring hayati önem taşır. Kuyruk boyutu aniden şişmeye başladıysa ya worker yetersiz kalıyordur ya da bir servis hata veriyordur.

# monitoring/queue_monitor.py
import redis
import json
import requests
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, db=0)

ALERT_THRESHOLDS = {
    'high_priority': 50,
    'default': 200,
    'low_priority': 1000,
    'dead_letter': 10
}

SLACK_WEBHOOK = 'https://hooks.slack.com/services/xxx/yyy/zzz'

def get_queue_lengths():
    """Tüm kuyruk uzunluklarını döner"""
    lengths = {}
    for queue_name in ALERT_THRESHOLDS.keys():
        if queue_name == 'dead_letter':
            lengths[queue_name] = r.llen('dead_letter_queue')
        else:
            lengths[queue_name] = r.llen(queue_name)
    return lengths

def send_slack_alert(queue_name, current_length, threshold):
    message = {
        "text": f":warning: *Webhook Kuyruk Uyarısı*",
        "attachments": [{
            "color": "danger",
            "fields": [
                {"title": "Kuyruk", "value": queue_name, "short": True},
                {"title": "Mevcut Boyut", "value": str(current_length), "short": True},
                {"title": "Eşik", "value": str(threshold), "short": True},
                {"title": "Zaman", "value": datetime.utcnow().isoformat(), "short": True}
            ]
        }]
    }
    
    try:
        requests.post(SLACK_WEBHOOK, json=message, timeout=5)
    except Exception as e:
        print(f"Slack alert gönderilemedi: {e}")

def check_and_alert():
    lengths = get_queue_lengths()
    
    for queue_name, length in lengths.items():
        threshold = ALERT_THRESHOLDS[queue_name]
        if length > threshold:
            send_slack_alert(queue_name, length, threshold)
            print(f"ALERT: {queue_name} kuyruğu {length} item (eşik: {threshold})")
        else:
            print(f"OK: {queue_name} = {length} item")

if __name__ == '__main__':
    check_and_alert()

Bu monitoring script’ini cron ile her 5 dakikada çalıştır:

# crontab -e
*/5 * * * * /opt/webhook_system/webhook_env/bin/python /opt/webhook_system/monitoring/queue_monitor.py >> /opt/webhook_system/logs/monitor.log 2>&1

Nginx Yapılandırması

# /etc/nginx/sites-available/webhook
cat > /etc/nginx/sites-available/webhook << 'EOF'
upstream webhook_app {
    server 127.0.0.1:8000;
    server 127.0.0.1:8001;
    keepalive 32;
}

server {
    listen 443 ssl http2;
    server_name webhooks.sirketim.com;

    ssl_certificate /etc/letsencrypt/live/webhooks.sirketim.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/webhooks.sirketim.com/privkey.pem;

    # Rate limiting - aynı IP'den çok fazla istek gelmesin
    limit_req_zone $binary_remote_addr zone=webhook_limit:10m rate=50r/m;
    limit_req zone=webhook_limit burst=20 nodelay;

    location /webhook/ {
        proxy_pass http://webhook_app;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        
        # Timeout ayarları - webhook'ların hızlı cevap alması lazım
        proxy_connect_timeout 5s;
        proxy_read_timeout 10s;
        proxy_send_timeout 10s;
        
        # Body size limiti
        client_max_body_size 1M;
    }
    
    # Flower monitoring sadece iç ağdan erişilebilsin
    location /flower/ {
        allow 10.0.0.0/8;
        allow 192.168.0.0/16;
        deny all;
        
        proxy_pass http://127.0.0.1:5555/;
        proxy_set_header Host $host;
    }
}
EOF

ln -s /etc/nginx/sites-available/webhook /etc/nginx/sites-enabled/
nginx -t && systemctl reload nginx

Yaygın Sorunlar ve Çözümleri

Pratikte en çok karşılaştığın sorunlardan biri worker’ların çökmesi ve kuyruktaki işlerin kaybolması. Bunun için acks_late=True ve task_reject_on_worker_lost=True ayarlarını mutlaka kullan. Bu ayarlarla task, worker onu başarıyla tamamlamadan önce acknowledge edilmez. Worker çökerse task kuyruğa geri döner.

Kuyruk şişmesi de sık karşılaşılan bir durum. Bir servise bağımlı task’lar o servis çöktüğünde birikmeye başlar. Bunun için task_time_limit ve task_soft_time_limit ayarlarını kullan. Soft limit geldiğinde task’a temizlik yapması için zaman tanınır, hard limit geldiğinde zorla öldürülür.

Duplicate task sorunu için idempotency key kullanımını başından tasarla. Her event’in benzersiz ID’sini Redis’te 24 saat tutmak genellikle yeterlidir.

Memory leak sorunları uzun çalışan worker’larda ortaya çıkabilir. worker_max_tasks_per_child=200 ile her worker belirli sayıda task işledikten sonra yeniden başlatılır.

Sonuç

Webhook ile asenkron kuyruk yönetimi, production ortamında kararlı bir entegrasyon altyapısı kurmanın temelidir. Özetlersek:

  • Webhook endpoint’i sadece doğrulama ve kuyruğa ekleme yapmalı, iş mantığı worker’da olmalı
  • İmza doğrulamasını asla atlama, bu güvenliğin ilk katmanı
  • Task’larını idempotent yaz, aynı işin iki kez çalışması sonucu değiştirmemeli
  • Farklı öncelikli işler için farklı kuyruklar ve worker pool’ları kullan
  • Dead letter queue ile başarısız işlemleri kaybetme
  • Kuyruk boyutlarını monitör et ve eşik aşıldığında anında uyarı al
  • Systemd ile worker’ları servis olarak çalıştır, her çöküşte otomatik ayağa kalksın

Bu yapıyı kurduğunda hem gelen webhook’lara anında cevap verebilir hem de arka planda tüm iş mantığını güvenle çalıştırabilirsin. Harici servisler “bu adam güvenilir, webhook’larımı işliyor” der ve mutlu olurlar. Sen de gece saat 3’te “ödeme webhook’ları işlenmiyor” diye uyandırılmazsın.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir