Webhook ile Asenkron İşlem Kuyruğu Yönetimi
Bir webhook geldiğinde “tamam, aldım” deyip işi hemen bitirmek kulağa hoş gelir. Ama gerçek dünyada işler böyle yürümez. Harici bir servisten gelen webhook isteği, veritabanı yazma, e-posta gönderme, dosya işleme gibi onlarca adımı tetikleyebilir. Tüm bunları webhook handler’ın içinde senkron olarak yapmaya kalkarsan hem timeout alırsın hem de gönderen servise “ben hazır değilim” mesajı vermek zorunda kalırsın. İşte tam bu noktada asenkron işlem kuyruğu devreye giriyor.
Bu yazıda gerçek bir senaryo üzerinden ilerleyeceğiz: Bir e-ticaret platformunun ödeme webhook’larını alması, bu işlemleri kuyruğa atması ve worker’ların kuyruğu işlemesi. Teknoloji olarak Redis tabanlı Celery ve RabbitMQ kombinasyonunu kullanacağız. Arka planda ne döndüğünü anlamak isteyenler için mimariyi adım adım açıklayacağım.
Neden Asenkron Kuyruk?
Webhook gönderen servisler genellikle çok sabırsızdır. Stripe, GitHub, Shopify gibi platformlar webhook endpoint’ine istek gönderir ve birkaç saniye içinde 200 OK bekler. Eğer bu sürede cevap alamazlarsa isteği başarısız sayar ve yeniden dener. Bu yeniden deneme mekanizması bazen 3-4 kez, bazen onlarca kez tekrar edebilir.
Problemi şöyle düşün: Bir ödeme webhook’u geldi. Sen bu webhook içinde müşteriye e-posta göndermeye, stok düşmeye, muhasebe sistemine kayıt atmaya çalışıyorsun. Bu işlemlerin toplamı 8-10 saniye sürüyor. Stripe ise 5 saniye sonra “cevap gelmedi” deyip aynı webhook’u tekrar gönderiyor. Şimdi elinde çift işlenmiş bir ödeme var. Felaket.
Çözüm şu: Webhook geldiğinde sadece veriyi al, kuyruğa at, 200 OK döndür. Worker’lar kuyruktaki işleri asenkron olarak yapsın.
Mimari Genel Bakış
Sistemimizin bileşenleri şöyle:
- Nginx: Reverse proxy, SSL termination
- Flask/FastAPI: Webhook endpoint’i, gelen isteği doğrular ve kuyruğa atar
- Redis veya RabbitMQ: Mesaj kuyruğu
- Celery Worker: Kuyruktaki işleri işleyen arka plan süreci
- Flower: Celery monitoring arayüzü
- PostgreSQL: İşlem logları ve durum takibi
Ortamı Kurma
Önce gerekli paketleri kuralım:
# Python sanal ortamı
python3 -m venv webhook_env
source webhook_env/bin/activate
# Gerekli paketler
pip install flask celery redis sqlalchemy psycopg2-binary
flower requests cryptography gunicorn
# Redis kurulumu (Ubuntu/Debian)
apt-get install -y redis-server
systemctl enable redis-server
systemctl start redis-server
# RabbitMQ kurulumu (alternatif broker)
apt-get install -y rabbitmq-server
systemctl enable rabbitmq-server
systemctl start rabbitmq-server
# RabbitMQ yönetim plugin'i
rabbitmq-plugins enable rabbitmq_management
Proje dizin yapısı şöyle olacak:
mkdir -p webhook_system/{app,workers,config,logs}
cd webhook_system
touch app/__init__.py app/webhook.py app/models.py
touch workers/tasks.py config/settings.py
touch celery_app.py docker-compose.yml
Celery Uygulamasını Yapılandırma
# celery_app.py
from celery import Celery
import os
def make_celery():
app = Celery(
'webhook_system',
broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'),
include=['workers.tasks']
)
app.conf.update(
# Task ayarları
task_serializer='json',
result_serializer='json',
accept_content=['json'],
# Timezone
timezone='Europe/Istanbul',
enable_utc=True,
# Retry ayarları
task_acks_late=True,
task_reject_on_worker_lost=True,
# Kuyruk yapılandırması
task_queues={
'high_priority': {
'exchange': 'high_priority',
'routing_key': 'high_priority',
},
'default': {
'exchange': 'default',
'routing_key': 'default',
},
'low_priority': {
'exchange': 'low_priority',
'routing_key': 'low_priority',
}
},
task_default_queue='default',
# Rate limiting
task_annotations={
'workers.tasks.process_payment': {
'rate_limit': '100/m'
}
},
# Worker ayarları
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=200,
# Sonuç saklama süresi
result_expires=86400,
)
return app
celery = make_celery()
Webhook Endpoint’i
Webhook endpoint’inin iki kritik görevi var: imza doğrulama ve kuyruğa atma. İmza doğrulamayı atlarsanız herhangi biri sahte webhook gönderebilir.
# app/webhook.py
from flask import Flask, request, jsonify, abort
import hmac
import hashlib
import json
import logging
from datetime import datetime
from celery_app import celery
from workers.tasks import process_payment, process_order_update, send_notification
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
WEBHOOK_SECRETS = {
'stripe': 'whsec_xxxxxxxxxxxxx',
'shopify': 'shopify_secret_xxx',
'github': 'github_secret_xxx'
}
def verify_stripe_signature(payload, signature, secret):
"""Stripe webhook imza doğrulama"""
try:
timestamp = None
signatures = []
for item in signature.split(','):
key, value = item.split('=', 1)
if key == 't':
timestamp = value
elif key == 'v1':
signatures.append(value)
if not timestamp or not signatures:
return False
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
expected = hmac.new(
secret.encode('utf-8'),
signed_payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
return any(hmac.compare_digest(expected, sig) for sig in signatures)
except Exception as e:
logger.error(f"İmza doğrulama hatası: {e}")
return False
@app.route('/webhook/stripe', methods=['POST'])
def stripe_webhook():
payload = request.get_data()
signature = request.headers.get('Stripe-Signature')
if not signature:
logger.warning("Stripe-Signature header eksik")
abort(400)
if not verify_stripe_signature(payload, signature, WEBHOOK_SECRETS['stripe']):
logger.warning("Geçersiz Stripe imzası")
abort(401)
try:
event = json.loads(payload)
event_type = event.get('type')
event_id = event.get('id')
logger.info(f"Stripe webhook alındı: {event_type} - {event_id}")
# Event tipine göre uygun kuyruğa at
if event_type == 'payment_intent.succeeded':
task = process_payment.apply_async(
args=[event],
queue='high_priority',
task_id=f"stripe_{event_id}",
countdown=0
)
logger.info(f"Ödeme işlemi kuyruğa eklendi: {task.id}")
elif event_type == 'invoice.payment_failed':
task = send_notification.apply_async(
args=[event, 'payment_failed'],
queue='high_priority',
retry=True,
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 60,
'interval_max': 300,
}
)
elif event_type in ['customer.updated', 'customer.deleted']:
process_order_update.apply_async(
args=[event],
queue='default',
expires=3600
)
return jsonify({
'status': 'queued',
'event_id': event_id,
'event_type': event_type,
'timestamp': datetime.utcnow().isoformat()
}), 200
except json.JSONDecodeError:
logger.error("JSON parse hatası")
abort(400)
except Exception as e:
logger.error(f"Webhook işleme hatası: {e}")
# Yine de 200 dön, yoksa Stripe tekrar dener
return jsonify({'status': 'error', 'message': str(e)}), 200
Worker Task’ları
Task’ları yazarken en önemli konu idempotency yani aynı işlemi birden fazla kez çalıştırsak bile sonucun değişmemesi. Çünkü kuyruk sistemlerinde bir task birden fazla kez çalışabilir.
# workers/tasks.py
from celery_app import celery
from celery.utils.log import get_task_logger
from datetime import datetime
import requests
import time
logger = get_task_logger(__name__)
@celery.task(
bind=True,
name='workers.tasks.process_payment',
max_retries=3,
default_retry_delay=60,
acks_late=True,
autoretry_for=(requests.Timeout, ConnectionError),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True
)
def process_payment(self, event_data):
"""
Ödeme işlemini asenkron olarak gerçekleştirir.
Idempotent: Aynı payment_id ile birden fazla çalışsa sorun olmaz.
"""
task_id = self.request.id
payment_id = event_data.get('data', {}).get('object', {}).get('id')
logger.info(f"Ödeme işleniyor: {payment_id} - Task: {task_id}")
try:
# 1. Idempotency kontrolü - daha önce işlendi mi?
if is_already_processed(payment_id):
logger.info(f"Ödeme zaten işlendi, atlıyorum: {payment_id}")
return {'status': 'skipped', 'reason': 'already_processed'}
# 2. İşlemi başladı olarak işaretle
mark_processing(payment_id, task_id)
# 3. Ödeme detaylarını al
payment_details = extract_payment_details(event_data)
# 4. Veritabanına kaydet
save_payment_to_db(payment_details)
# 5. Stok güncelle
update_inventory(payment_details['items'])
# 6. Müşteriye onay e-postası gönder
send_confirmation_email(payment_details['customer_email'])
# 7. Muhasebe sistemine bildir
notify_accounting_system(payment_details)
# 8. İşlemi tamamlandı olarak işaretle
mark_completed(payment_id)
logger.info(f"Ödeme başarıyla işlendi: {payment_id}")
return {
'status': 'success',
'payment_id': payment_id,
'processed_at': datetime.utcnow().isoformat()
}
except requests.Timeout as exc:
logger.warning(f"Timeout hatası, yeniden deneniyor: {payment_id}")
raise self.retry(exc=exc, countdown=self.default_retry_delay * (self.request.retries + 1))
except Exception as exc:
logger.error(f"Ödeme işleme hatası: {payment_id} - {exc}")
if self.request.retries >= self.max_retries:
# Maksimum deneme sayısına ulaşıldı, dead letter queue'ya gönder
send_to_dead_letter_queue(event_data, str(exc))
mark_failed(payment_id, str(exc))
raise
def is_already_processed(payment_id):
"""Redis üzerinde idempotency kontrolü"""
import redis
r = redis.Redis(host='localhost', port=6379, db=2)
return r.exists(f"processed:payment:{payment_id}")
def mark_processing(payment_id, task_id):
import redis
r = redis.Redis(host='localhost', port=6379, db=2)
r.setex(f"processing:payment:{payment_id}", 3600, task_id)
def mark_completed(payment_id):
import redis
r = redis.Redis(host='localhost', port=6379, db=2)
r.delete(f"processing:payment:{payment_id}")
# 24 saat boyunca tamamlandı olarak işaretle
r.setex(f"processed:payment:{payment_id}", 86400, "1")
@celery.task(bind=True, name='workers.tasks.send_notification', max_retries=5)
def send_notification(self, event_data, notification_type):
"""Bildirim gönderme task'ı"""
customer_id = event_data.get('data', {}).get('object', {}).get('customer')
try:
if notification_type == 'payment_failed':
# SMS ve e-posta ile bildirim gönder
customer = get_customer_details(customer_id)
send_sms(customer['phone'], "Ödemeniz başarısız oldu. Lütfen kart bilgilerinizi kontrol edin.")
send_email(
customer['email'],
subject="Ödeme Başarısız",
template="payment_failed",
context={'customer': customer}
)
return {'status': 'sent', 'customer_id': customer_id}
except Exception as exc:
logger.error(f"Bildirim gönderilemedi: {exc}")
raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))
Dead Letter Queue Yönetimi
Maksimum deneme sayısını aşan task’ları kaybetmemek için dead letter queue kullanıyoruz:
# workers/dead_letter.py
import json
import redis
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
r = redis.Redis(host='localhost', port=6379, db=3)
DLQ_KEY = 'dead_letter_queue'
def send_to_dead_letter_queue(event_data, error_message, metadata=None):
"""Başarısız işlemi dead letter queue'ya ekler"""
dlq_entry = {
'event_data': event_data,
'error': error_message,
'failed_at': datetime.utcnow().isoformat(),
'metadata': metadata or {}
}
r.lpush(DLQ_KEY, json.dumps(dlq_entry))
logger.error(f"İşlem DLQ'ya eklendi: {event_data.get('id', 'unknown')}")
def get_dlq_items(count=10):
"""DLQ'dan item listeler"""
items = r.lrange(DLQ_KEY, 0, count - 1)
return [json.loads(item) for item in items]
def retry_dlq_item(index):
"""DLQ'dan bir item'ı yeniden dener"""
from workers.tasks import process_payment
items = r.lrange(DLQ_KEY, index, index)
if not items:
return False
item = json.loads(items[0])
event_data = item['event_data']
# Yeniden kuyruğa at
process_payment.apply_async(
args=[event_data],
queue='default',
countdown=0
)
# DLQ'dan kaldır
r.lrem(DLQ_KEY, 1, items[0])
logger.info(f"DLQ item yeniden kuyruğa eklendi")
return True
def drain_dlq():
"""Tüm DLQ'yu yeniden işler"""
count = r.llen(DLQ_KEY)
logger.info(f"DLQ'da {count} item var, yeniden işleniyor...")
for i in range(count):
retry_dlq_item(0)
import time
time.sleep(1) # Rate limiting
logger.info("DLQ draining tamamlandı")
Worker’ları Başlatma ve Yönetme
# Farklı kuyruklar için farklı worker'lar başlat
# High priority worker - daha fazla concurrency
celery -A celery_app worker
--queues=high_priority
--concurrency=8
--prefetch-multiplier=1
--loglevel=info
--logfile=logs/worker_high.log
--hostname=worker_high@%h
--detach
# Default queue worker
celery -A celery_app worker
--queues=default
--concurrency=4
--prefetch-multiplier=1
--loglevel=info
--logfile=logs/worker_default.log
--hostname=worker_default@%h
--detach
# Low priority worker
celery -A celery_app worker
--queues=low_priority
--concurrency=2
--loglevel=warning
--logfile=logs/worker_low.log
--hostname=worker_low@%h
--detach
# Flower monitoring başlat
celery -A celery_app flower
--port=5555
--basic_auth=admin:guclu_sifre
--url_prefix=flower &
# Kuyruk durumunu kontrol et
celery -A celery_app inspect active
celery -A celery_app inspect reserved
celery -A celery_app inspect stats
# Belirli bir worker'ı güvenli şekilde kapat
celery -A celery_app control shutdown worker_high@hostname
# Tüm worker'lara görev iptal gönder
celery -A celery_app purge
Systemd ile Servis Yönetimi
# /etc/systemd/system/celery-webhook.service
cat > /etc/systemd/system/celery-webhook.service << 'EOF'
[Unit]
Description=Webhook Celery Worker
After=network.target redis.service
[Service]
Type=forking
User=webhook_user
Group=webhook_user
WorkingDirectory=/opt/webhook_system
Environment=CELERY_BROKER_URL=redis://localhost:6379/0
Environment=CELERY_RESULT_BACKEND=redis://localhost:6379/1
ExecStart=/opt/webhook_system/webhook_env/bin/celery
multi start
worker_high worker_default worker_low
-A celery_app
--logfile=/opt/webhook_system/logs/%n.log
--pidfile=/var/run/celery/%n.pid
-Q:worker_high high_priority
-Q:worker_default default
-Q:worker_low low_priority
--concurrency:worker_high=8
--concurrency:worker_default=4
--concurrency:worker_low=2
ExecStop=/opt/webhook_system/webhook_env/bin/celery
multi stopwait
worker_high worker_default worker_low
--pidfile=/var/run/celery/%n.pid
ExecReload=/opt/webhook_system/webhook_env/bin/celery
multi restart
worker_high worker_default worker_low
-A celery_app
--pidfile=/var/run/celery/%n.pid
RuntimeDirectory=celery
RuntimeDirectoryMode=0755
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
mkdir -p /var/run/celery
chown webhook_user:webhook_user /var/run/celery
systemctl daemon-reload
systemctl enable celery-webhook
systemctl start celery-webhook
systemctl status celery-webhook
Monitoring ve Alerting
Kuyruk yönetiminde monitoring hayati önem taşır. Kuyruk boyutu aniden şişmeye başladıysa ya worker yetersiz kalıyordur ya da bir servis hata veriyordur.
# monitoring/queue_monitor.py
import redis
import json
import requests
from datetime import datetime
r = redis.Redis(host='localhost', port=6379, db=0)
ALERT_THRESHOLDS = {
'high_priority': 50,
'default': 200,
'low_priority': 1000,
'dead_letter': 10
}
SLACK_WEBHOOK = 'https://hooks.slack.com/services/xxx/yyy/zzz'
def get_queue_lengths():
"""Tüm kuyruk uzunluklarını döner"""
lengths = {}
for queue_name in ALERT_THRESHOLDS.keys():
if queue_name == 'dead_letter':
lengths[queue_name] = r.llen('dead_letter_queue')
else:
lengths[queue_name] = r.llen(queue_name)
return lengths
def send_slack_alert(queue_name, current_length, threshold):
message = {
"text": f":warning: *Webhook Kuyruk Uyarısı*",
"attachments": [{
"color": "danger",
"fields": [
{"title": "Kuyruk", "value": queue_name, "short": True},
{"title": "Mevcut Boyut", "value": str(current_length), "short": True},
{"title": "Eşik", "value": str(threshold), "short": True},
{"title": "Zaman", "value": datetime.utcnow().isoformat(), "short": True}
]
}]
}
try:
requests.post(SLACK_WEBHOOK, json=message, timeout=5)
except Exception as e:
print(f"Slack alert gönderilemedi: {e}")
def check_and_alert():
lengths = get_queue_lengths()
for queue_name, length in lengths.items():
threshold = ALERT_THRESHOLDS[queue_name]
if length > threshold:
send_slack_alert(queue_name, length, threshold)
print(f"ALERT: {queue_name} kuyruğu {length} item (eşik: {threshold})")
else:
print(f"OK: {queue_name} = {length} item")
if __name__ == '__main__':
check_and_alert()
Bu monitoring script’ini cron ile her 5 dakikada çalıştır:
# crontab -e
*/5 * * * * /opt/webhook_system/webhook_env/bin/python /opt/webhook_system/monitoring/queue_monitor.py >> /opt/webhook_system/logs/monitor.log 2>&1
Nginx Yapılandırması
# /etc/nginx/sites-available/webhook
cat > /etc/nginx/sites-available/webhook << 'EOF'
upstream webhook_app {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
keepalive 32;
}
server {
listen 443 ssl http2;
server_name webhooks.sirketim.com;
ssl_certificate /etc/letsencrypt/live/webhooks.sirketim.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/webhooks.sirketim.com/privkey.pem;
# Rate limiting - aynı IP'den çok fazla istek gelmesin
limit_req_zone $binary_remote_addr zone=webhook_limit:10m rate=50r/m;
limit_req zone=webhook_limit burst=20 nodelay;
location /webhook/ {
proxy_pass http://webhook_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# Timeout ayarları - webhook'ların hızlı cevap alması lazım
proxy_connect_timeout 5s;
proxy_read_timeout 10s;
proxy_send_timeout 10s;
# Body size limiti
client_max_body_size 1M;
}
# Flower monitoring sadece iç ağdan erişilebilsin
location /flower/ {
allow 10.0.0.0/8;
allow 192.168.0.0/16;
deny all;
proxy_pass http://127.0.0.1:5555/;
proxy_set_header Host $host;
}
}
EOF
ln -s /etc/nginx/sites-available/webhook /etc/nginx/sites-enabled/
nginx -t && systemctl reload nginx
Yaygın Sorunlar ve Çözümleri
Pratikte en çok karşılaştığın sorunlardan biri worker’ların çökmesi ve kuyruktaki işlerin kaybolması. Bunun için acks_late=True ve task_reject_on_worker_lost=True ayarlarını mutlaka kullan. Bu ayarlarla task, worker onu başarıyla tamamlamadan önce acknowledge edilmez. Worker çökerse task kuyruğa geri döner.
Kuyruk şişmesi de sık karşılaşılan bir durum. Bir servise bağımlı task’lar o servis çöktüğünde birikmeye başlar. Bunun için task_time_limit ve task_soft_time_limit ayarlarını kullan. Soft limit geldiğinde task’a temizlik yapması için zaman tanınır, hard limit geldiğinde zorla öldürülür.
Duplicate task sorunu için idempotency key kullanımını başından tasarla. Her event’in benzersiz ID’sini Redis’te 24 saat tutmak genellikle yeterlidir.
Memory leak sorunları uzun çalışan worker’larda ortaya çıkabilir. worker_max_tasks_per_child=200 ile her worker belirli sayıda task işledikten sonra yeniden başlatılır.
Sonuç
Webhook ile asenkron kuyruk yönetimi, production ortamında kararlı bir entegrasyon altyapısı kurmanın temelidir. Özetlersek:
- Webhook endpoint’i sadece doğrulama ve kuyruğa ekleme yapmalı, iş mantığı worker’da olmalı
- İmza doğrulamasını asla atlama, bu güvenliğin ilk katmanı
- Task’larını idempotent yaz, aynı işin iki kez çalışması sonucu değiştirmemeli
- Farklı öncelikli işler için farklı kuyruklar ve worker pool’ları kullan
- Dead letter queue ile başarısız işlemleri kaybetme
- Kuyruk boyutlarını monitör et ve eşik aşıldığında anında uyarı al
- Systemd ile worker’ları servis olarak çalıştır, her çöküşte otomatik ayağa kalksın
Bu yapıyı kurduğunda hem gelen webhook’lara anında cevap verebilir hem de arka planda tüm iş mantığını güvenle çalıştırabilirsin. Harici servisler “bu adam güvenilir, webhook’larımı işliyor” der ve mutlu olurlar. Sen de gece saat 3’te “ödeme webhook’ları işlenmiyor” diye uyandırılmazsın.
