Celery Retry Mekanizması ile Hata Toleranslı Görevler
Bir görev kuyruğu sistemi kuruyorsunuz, her şey mükemmel çalışıyor, deployment’a alıyorsunuz ve iki gün sonra şunu görüyorsunuz: “Task failed: ConnectionError – Remote end closed connection without response.” Üstelik bu hata sessiz sedasız geliyor, görev ölüyor ve kimse farkında bile olmuyor. İşte tam bu noktada Celery’nin retry mekanizması hayat kurtarıcı oluyor.
Bu yazıda sıfırdan retry konfigürasyonunu ele alacağım, ama salt dokümantasyon özeti değil. Gerçek prodüksiyon ortamlarında karşılaştığım sorunları ve bu sorunları nasıl çözdüğümü anlatacağım.
Neden Retry Mekanizması Bu Kadar Kritik?
Dağıtık sistemlerde şunu kabul etmek gerekiyor: her şey er ya da geç başarısız olur. Ağ bağlantısı kopar, veritabanı kısa süreliğine yanıt vermez, üçüncü parti bir API 503 döner, bellek yetersiz kalır. Bu geçici hatalar için görevi tamamen silip atmak yerine, makul aralıklarla tekrar denemek çok daha sağlıklı bir yaklaşım.
Celery bu konuda oldukça güçlü araçlar sunuyor. Ama bu araçları doğru kullanmak, yanlış kullanmak kadar önemli. Yanlış konfigüre edilmiş retry mantığı worker’larınızı tıkayabilir, RabbitMQ veya Redis kuyruklarınızı şişirebilir ve sisteminizin genel sağlığını bozabilir.
Temel Retry Kullanımı
En basit haliyle bir görevi retry etmek için self.retry() metodunu kullanıyoruz. Bunun için görev fonksiyonunun bind=True ile tanımlanması gerekiyor:
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
import requests
@shared_task(bind=True, max_retries=3)
def harici_api_cagir(self, kullanici_id: int):
try:
response = requests.get(
f"https://api.ornek.com/kullanici/{kullanici_id}",
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as exc:
try:
# 60 saniye sonra tekrar dene
raise self.retry(exc=exc, countdown=60)
except MaxRetriesExceededError:
# Maksimum deneme sayısına ulaşıldı
logger.error(
f"Kullanici {kullanici_id} icin API cagrisi basarisiz oldu, "
f"maksimum deneme sayisina ulasildi."
)
raise
Burada dikkat edilmesi gereken birkaç nokta var. self.retry() bir exception fırlatıyor, yani raise ile kullanmanız gerekiyor. Bu davranış başlangıçta tuhaf gelebilir ama Celery’nin görevi yeniden kuyruğa alması için bu tasarım bilinçli olarak yapılmış.
Exponential Backoff ile Akıllı Bekleme Süreleri
Sabit bir countdown değeri çoğu zaman yeterli olmuyor. Özellikle bir servis geçici olarak çöktüğünde, onlarca görevin aynı anda o servisi dövmesi durumu daha da kötüleştirebilir. Exponential backoff tam burada devreye giriyor:
import math
import logging
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
max_retries=5,
default_retry_delay=30
)
def odeme_islemi_yap(self, siparis_id: int, tutar: float):
try:
# Ödeme gateway çağrısı
sonuc = odeme_gateway.charge(siparis_id, tutar)
return {"basarili": True, "transaction_id": sonuc.transaction_id}
except odeme_gateway.GeciciHata as exc:
# Exponential backoff: 2^retry_sayisi * 30 saniye
bekleme_suresi = (2 ** self.request.retries) * 30
logger.warning(
f"Siparis {siparis_id} odeme hatasi. "
f"Deneme {self.request.retries + 1}/5. "
f"{bekleme_suresi}sn sonra tekrar denenecek. "
f"Hata: {str(exc)}"
)
raise self.retry(exc=exc, countdown=bekleme_suresi)
except odeme_gateway.KaliciHata as exc:
# Kalıcı hatalarda retry etme, direkt başarısız say
logger.error(f"Siparis {siparis_id} kalici odeme hatasi: {str(exc)}")
raise
Bu örnekte dikkat edin: geçici hatalar için retry yaparken, kalıcı hatalar için retry yapmıyoruz. Bir kartın limitinin dolması geçici bir durum değil, tekrar denemenin anlamı yok. Bu ayrımı kodunuzda net tutmak çok önemli.
Task Decorator Üzerinden Global Retry Konfigürasyonu
Her görev için ayrı ayrı retry ayarı yazmak yerine, decorator seviyesinde konfigürasyon da tanımlayabilirsiniz:
from celery import shared_task
from kombu.exceptions import OperationalError
@shared_task(
bind=True,
autoretry_for=(
ConnectionError,
TimeoutError,
OperationalError,
),
retry_kwargs={
"max_retries": 4,
"countdown": 10
},
retry_backoff=True,
retry_backoff_max=600, # Maksimum 10 dakika bekle
retry_jitter=True # Thundering herd problemini önle
)
def veritabani_raporu_olustur(self, rapor_id: int):
"""
retry_jitter=True kullanımı önemli: birden fazla görev aynı
anda retry'a girdiğinde, hepsinin aynı saniyede tekrar denemesini engeller.
"""
with db.transaction():
rapor = Rapor.objects.get(id=rapor_id)
rapor.hesapla()
rapor.kaydet()
return rapor.id
retry_jitter=True parametresini mutlaka kullanın. Bunu ilk öğrendiğimde şaşırmıştım ama pratikte çok büyük fark yaratıyor. Diyelim ki bir veritabanı geçici olarak 5 saniye kapandı ve 200 görev retry bekliyordu. Jitter olmadan bu 200 görev tam olarak aynı anda veritabanına saldırır. Jitter ile her biri farklı bir anda deneme yapar, yük dağılır.
Celery Signals ile Retry Takibi
Prodüksiyon ortamında retry’ları izlemeniz gerekiyor. Kaç kez denendi, hangi görev daha çok başarısız oluyor, retry’lar ne kadar sürüyor? Bunları anlamak için Celery signals mekanizmasını kullanabilirsiniz:
from celery import signals
import statsd # veya prometheus_client
# statsd bağlantısı
stats = statsd.StatsClient('localhost', 8125)
@signals.task_retry.connect
def gorev_retry_takip(sender, request, reason, einfo, **kwargs):
"""Her retry'da bu sinyal tetiklenir."""
gorev_adi = sender.name
deneme_no = request.retries
# Metrik gönder
stats.incr(f"celery.retry.{gorev_adi}")
stats.gauge(f"celery.retry_no.{gorev_adi}", deneme_no)
logger.warning(
f"RETRY | Gorev: {gorev_adi} | "
f"Deneme: {deneme_no} | "
f"Sebep: {reason} | "
f"Task ID: {request.id}"
)
@signals.task_failure.connect
def gorev_basarisiz_takip(sender, task_id, exception, **kwargs):
"""Tüm retry'lar bittikten sonra görev başarısız olduğunda."""
gorev_adi = sender.name
stats.incr(f"celery.failure.{gorev_adi}")
# Kritik görevler için anında bildirim
kritik_gorevler = [
"myapp.tasks.odeme_islemi_yap",
"myapp.tasks.fatura_gonder"
]
if gorev_adi in kritik_gorevler:
bildirim_gonder(
kanal="ops-alert",
mesaj=f"KRITiK: {gorev_adi} tamamen basarisiz oldu! "
f"Task ID: {task_id} | Hata: {str(exception)}"
)
Dead Letter Queue ile Başarısız Görevlerin Yönetimi
Tüm retry’lar bittikten sonra görev nereye gidiyor? Varsayılan davranış görevi silmek. Ama prodüksiyon ortamında başarısız görevleri dead letter queue (DLQ) veya ayrı bir kuyruğa atmak çok daha akıllıca:
# celery_config.py
from kombu import Queue, Exchange
# Dead letter exchange tanımı
dead_letter_exchange = Exchange(
"dead_letters",
type="direct",
durable=True
)
task_queues = (
Queue(
"varsayilan",
Exchange("varsayilan"),
routing_key="varsayilan",
queue_arguments={
"x-dead-letter-exchange": "dead_letters",
"x-dead-letter-routing-key": "basarisiz_gorevler",
"x-message-ttl": 86400000 # 24 saat (ms cinsinden)
}
),
Queue(
"basarisiz_gorevler",
dead_letter_exchange,
routing_key="basarisiz_gorevler"
),
Queue(
"oncelikli",
Exchange("oncelikli"),
routing_key="oncelikli",
),
)
# Görev yönlendirme
task_routes = {
"myapp.tasks.odeme_*": {"queue": "oncelikli"},
"myapp.tasks.*": {"queue": "varsayilan"},
}
Bu konfigürasyonla birlikte, başarısız olan görevler otomatik olarak basarisiz_gorevler kuyruğuna düşüyor. Oradan manuel inceleme yapabilir, gerekirse görevi yeniden kuyruğa alabilirsiniz.
Gerçek Dünya Senaryosu: E-posta Gönderimi
Biraz daha gerçekçi bir örnek üzerinden gidelim. E-ticaret uygulamalarında e-posta gönderimi klasik bir async görev örneği. Ama SMTP sunucunuz bazen yavaş yanıt veriyor, bazen rate limit’e takılıyorsunuz:
import smtplib
import logging
from typing import Optional
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
from django.core.mail import send_mail
from django.core.cache import cache
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
max_retries=3,
soft_time_limit=30,
time_limit=60,
acks_late=True # Görev tamamlanana kadar acknowledge etme
)
def siparis_onay_maili_gonder(
self,
siparis_id: int,
alici_email: str,
alici_ad: str
) -> dict:
"""
acks_late=True önemli: worker crash olursa görev yeniden kuyruğa alınır.
Ama bu idempotency gerektirir - aynı mail iki kez gönderilmemeli.
"""
# Idempotency kontrolü - aynı sipariş için mail gönderildi mi?
cache_key = f"mail_gonderildi_{siparis_id}"
if cache.get(cache_key):
logger.info(f"Siparis {siparis_id} icin mail zaten gonderilmis, atlaniyor.")
return {"durum": "atland", "sebep": "zaten_gonderilmis"}
try:
send_mail(
subject=f"Siparişiniz Alındı - #{siparis_id}",
message=f"Merhaba {alici_ad}, siparişiniz başarıyla alındı.",
from_email="[email protected]",
recipient_list=[alici_email],
fail_silently=False
)
# Başarılı gönderim sonrası cache'e kaydet (24 saat)
cache.set(cache_key, True, timeout=86400)
logger.info(f"Siparis {siparis_id} onay maili gonderildi: {alici_email}")
return {"durum": "gonderildi", "alici": alici_email}
except smtplib.SMTPServerDisconnected as exc:
bekleme = 2 ** self.request.retries * 60 # 60, 120, 240 saniye
logger.warning(f"SMTP baglanti koptu, {bekleme}sn sonra tekrar denenecek.")
raise self.retry(exc=exc, countdown=bekleme)
except smtplib.SMTPException as exc:
hata_mesaji = str(exc)
# Rate limit hatası mı?
if "Too many requests" in hata_mesaji or "421" in hata_mesaji:
raise self.retry(exc=exc, countdown=300) # 5 dakika bekle
# Geçersiz alıcı gibi kalıcı hatalarda retry etme
if "550" in hata_mesaji or "553" in hata_mesaji:
logger.error(f"Gecersiz mail adresi: {alici_email}")
raise # Retry yok
# Diğer hatalar
raise self.retry(exc=exc, countdown=120)
Bu örnekteki acks_late=True ve idempotency kontrolü kombinasyonu özellikle önemli. acks_late olmadan, worker mesajı aldığı anda acknowledge eder. Worker crash olursa görev kaybolur. acks_late ile görev tamamlanana kadar bekler. Ama bu sefer de worker crash olursa görev yeniden çalışır, yani aynı mail iki kez gidebilir. Cache ile idempotency kontrolü bu problemi çözüyor.
Retry Durumunu İzleme ve Celery Flower
Retry’larınızın gerçekten çalışıp çalışmadığını, kaç görevin retry’da olduğunu görmek için Celery Flower’ı kullanabilirsiniz:
# Flower kurulumu
pip install flower
# Flower başlatma
celery -A myproject flower --port=5555 --basic_auth=admin:sifre123
# Belirli bir kuyruğun durumunu CLI'dan kontrol etme
celery -A myproject inspect active_queues
# Retry'daki görevleri listele
celery -A myproject inspect reserved
# Belirli bir görevi iptal et
celery -A myproject control revoke <task_id> --terminate
# Worker istatistiklerini görüntüle
celery -A myproject inspect stats | python -m json.tool
Flower’ın sunduğu görsel arayüz ile hangi görevlerin retry’da beklediğini, ortalama retry süresini ve başarı oranlarını anlık takip edebilirsiniz. Bunu bir Grafana dashboard’uyla birleştirirseniz, operasyonel görünürlük ciddi ölçüde artıyor.
Genel Hataları ve Tuzakları
Celery retry kullanırken sık yapılan hataların başında retry’ı sonsuz döngüye sokmak geliyor. max_retries=None ayarı görevi sonsuza kadar retry’a sokar. Prodüksiyon ortamında bu ayarı asla kullanmayın.
İkinci yaygın hata, büyük objeleri görev parametresi olarak geçmek. Bir göreve büyük bir dictionary veya liste geçerseniz, bu veri RabbitMQ veya Redis’te depolanıyor. Retry’lar arttıkça bu mesaj kuyruğu şişmeye başlar. Bunun yerine her zaman ID geçip görevi içinde veriyi çekin:
# YANLIS - Büyük obje geçme
@shared_task(bind=True)
def rapor_isle(self, rapor_verisi: dict): # 50MB'lık veri
pass
# DOGRU - ID geçip içeride çek
@shared_task(bind=True, max_retries=3)
def rapor_isle(self, rapor_id: int):
rapor = Rapor.objects.select_related("kullanici").get(id=rapor_id)
# işlemler...
Üçüncü önemli nokta, transaction içinde görev başlatmak. Django kullanıyorsanız, bir veritabanı transaction’ı içinden Celery görevi başlatırsanız ve transaction rollback olursa, görev zaten kuyruğa alınmış olur ama ilgili veri veritabanında olmayabilir. Bu durumda transaction.on_commit kullanın:
from django.db import transaction
def siparis_olustur(siparis_verisi):
with transaction.atomic():
siparis = Siparis.objects.create(**siparis_verisi)
# Transaction commit olunca görevi kuyruğa al
transaction.on_commit(
lambda: siparis_onay_maili_gonder.delay(
siparis_id=siparis.id,
alici_email=siparis.musteri.email,
alici_ad=siparis.musteri.ad
)
)
return siparis
Sonuç
Celery’nin retry mekanizması doğru kullanıldığında sistemlerinizi gerçekten hata toleranslı hale getirebilir. Özetlemek gerekirse:
- Geçici ve kalıcı hataları ayırt edin: Her hata retry’a layık değil. Ağ hatası retry’a girer, geçersiz veri hatası girmez.
- Exponential backoff ve jitter kullanın: Sabit bekleme süreleri thundering herd problemine yol açar.
acks_late=Trueile idempotency’yi birlikte düşünün: Biri olmadan diğeri sorun çıkarır.- Retry’larınızı metriklerle takip edin: Kaç retry yapıldığını bilmeden sisteminizi iyileştiremezsiniz.
- Büyük obje yerine ID geçin: Kuyruk şişmesinin en yaygın sebebi bu.
- DLQ konfigüre edin: Tamamen başarısız görevler uçup gitmesin, incelenebilsin.
transaction.on_commitkullanın: Django transaction’larından görev başlatırken bu şart.
Bu konular başlangıçta karmaşık görünebilir, ama bir kez doğru konfigüre ettiğinizde sistem neredeyse kendi kendine toparlanmaya başlıyor. En büyük tatmin de zaten o: sabah uyandığınızda gecenin geçici ağ problemlerinin retry mekanizması sayesinde kendiliğinden çözüldüğünü görmek.
