Celery ile E-posta Gönderimi: Asenkron Yapılandırma Rehberi
Üretim ortamında e-posta gönderimi düşündüğünüzden çok daha karmaşık bir meseleye dönüşebilir. Kullanıcı “Şifremi unuttum” butonuna tıkladığında, arkada ne olduğunu düşünmüş müydünüz? Eğer o e-posta isteği senkron işleniyorsa, SMTP sunucusuna bağlantı kurulana kadar kullanıcı bekleme ekranında asılı kalır. Sunucunuz o an yavaşsa, SMTP timeout yaşıyorsa veya toplu e-posta gönderimi yapıyorsanız, web uygulamanız adeta felç olur. İşte bu yüzden Celery ile asenkron e-posta gönderimi, modern bir Django/Flask uygulamasında artık bir lüks değil, zorunluluk haline gelmiştir.
Neden Asenkron E-posta?
Bir e-ticaret sitesi düşünün. Sipariş tamamlandığında onay maili, kargo çıktığında bilgilendirme maili, her hafta promosyon bülteni. Eğer bunların hepsini senkron gönderiyorsanız, 10.000 kişilik bir bülten gönderimi sırasında web sunucunuzun worker’ları e-posta görevleriyle meşgulken yeni gelen HTTP istekleri kuyrukta bekler. Bu, production ortamında ciddi bir latency sorununa dönüşür.
Celery devreye girdiğinde tablo tamamen değişir. Web uygulaması e-posta görevini kuyruğa ekler ve hemen “görevi aldım” yanıtı döner. Arka planda Celery worker’ları bu görevleri alır, işler, gerekirse yeniden dener. Kullanıcı deneyimi bozulmaz, web sunucusu bloke olmaz.
Ortam Kurulumu
Önce gerekli paketleri yükleyelim. Ben bu yazıda Django + Redis kombinasyonunu kullanacağım çünkü Redis hem broker hem de result backend olarak mükemmel çalışıyor ve production’da yönetmesi kolay.
pip install celery redis django-celery-results
pip install celery[redis]
# Paketleri requirements.txt'e ekleyelim
pip freeze | grep -E "celery|redis" >> requirements.txt
Redis kurulumu için:
# Ubuntu/Debian
sudo apt-get install redis-server
sudo systemctl enable redis-server
sudo systemctl start redis-server
# Redis çalışıyor mu kontrol et
redis-cli ping
# PONG döndürmeli
Django Proje Yapılandırması
Proje dizin yapısı şöyle olmalı:
myproject/
myproject/
__init__.py
celery.py
settings.py
notifications/
tasks.py
email_templates/
manage.py
myproject/celery.py dosyasını oluşturuyoruz. Bu dosya Celery uygulamasını başlatır ve Django settings’e bağlar:
import os
from celery import Celery
from django.conf import settings
# Django settings modülünü belirt
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Django settings dosyasından celery konfigürasyonunu oku
# CELERY_ prefix'i ile başlayan ayarları alır
app.config_from_object('django.conf:settings', namespace='CELERY')
# Tüm Django app'lerindeki tasks.py dosyalarını otomatik bul
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
myproject/__init__.py dosyasını güncelliyoruz:
# Bu import, Django başladığında Celery app'inin yüklenmesini sağlar
from .celery import app as celery_app
__all__ = ('celery_app',)
settings.py dosyasına Celery ayarlarını ekliyoruz:
# Celery Konfigürasyonu
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Istanbul'
# E-posta gönderimlerinde önemli: task'ların maksimum kaç saniye çalışacağı
CELERY_TASK_SOFT_TIME_LIMIT = 60 # 60 saniye sonra SoftTimeLimitExceeded fırlatır
CELERY_TASK_TIME_LIMIT = 120 # 120 saniye sonra worker process'i öldürür
# Retry mekanizması için
CELERY_TASK_MAX_RETRIES = 3
CELERY_TASK_DEFAULT_RETRY_DELAY = 60 # saniye
# Result backend olarak Django DB kullanmak istiyorsanız
INSTALLED_APPS = [
...
'django_celery_results',
]
# Django DB result backend
# CELERY_RESULT_BACKEND = 'django-db'
# CELERY_CACHE_BACKEND = 'django-cache'
# E-posta ayarları (örnek: Gmail SMTP)
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.gmail.com'
EMAIL_PORT = 587
EMAIL_USE_TLS = True
EMAIL_HOST_USER = os.environ.get('EMAIL_HOST_USER')
EMAIL_HOST_PASSWORD = os.environ.get('EMAIL_HOST_PASSWORD')
DEFAULT_FROM_EMAIL = 'MyApp <[email protected]>'
E-posta Task’larının Yazılması
Şimdi asıl işin özüne gelelim. notifications/tasks.py dosyasını oluşturuyoruz:
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.mail import send_mail, send_mass_mail, EmailMultiAlternatives
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
import time
logger = get_task_logger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
name='notifications.tasks.send_welcome_email'
)
def send_welcome_email(self, user_id: int):
"""
Yeni kullanıcıya hoş geldin maili gönderir.
bind=True ile self erişimi sağlanır, bu retry için gerekli.
"""
from django.contrib.auth import get_user_model
User = get_user_model()
try:
user = User.objects.get(pk=user_id)
logger.info(f"Hoş geldin maili gönderiliyor: {user.email}")
# HTML template'den içerik üret
context = {
'user': user,
'site_name': 'MyApp',
'login_url': f"{settings.SITE_URL}/login/",
}
html_content = render_to_string('emails/welcome.html', context)
text_content = strip_tags(html_content)
# EmailMultiAlternatives hem HTML hem plain text destekler
msg = EmailMultiAlternatives(
subject='MyApp'e Hoş Geldiniz!',
body=text_content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=[user.email]
)
msg.attach_alternative(html_content, "text/html")
msg.send()
logger.info(f"Mail başarıyla gönderildi: {user.email}")
return {'status': 'success', 'email': user.email}
except User.DoesNotExist:
# Kullanıcı bulunamazsa retry yapmanın anlamı yok
logger.error(f"Kullanıcı bulunamadı: user_id={user_id}")
return {'status': 'error', 'reason': 'user_not_found'}
except Exception as exc:
logger.error(f"Mail gönderilemedi: {user_id}, hata: {str(exc)}")
# Exponential backoff ile retry
retry_in = 60 * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=retry_in)
@shared_task(
bind=True,
max_retries=5,
name='notifications.tasks.send_bulk_email'
)
def send_bulk_email(self, recipient_list: list, subject: str,
template_name: str, context: dict):
"""
Toplu e-posta gönderimi. Büyük listeleri chunk'lara bölerek gönderir.
"""
CHUNK_SIZE = 50 # Her seferinde 50 kişiye gönder
html_content = render_to_string(template_name, context)
text_content = strip_tags(html_content)
messages = []
for recipient in recipient_list:
messages.append((
subject,
text_content,
settings.DEFAULT_FROM_EMAIL,
[recipient]
))
# Listeyi chunk'lara böl
for i in range(0, len(messages), CHUNK_SIZE):
chunk = messages[i:i + CHUNK_SIZE]
try:
send_mass_mail(chunk, fail_silently=False)
logger.info(f"Chunk gönderildi: {i} - {i + len(chunk)}")
# Rate limiting için kısa bekleme
time.sleep(1)
except Exception as exc:
logger.error(f"Chunk gönderilemedi, index: {i}, hata: {exc}")
raise self.retry(exc=exc, countdown=120)
return {'status': 'success', 'total_sent': len(recipient_list)}
View Katmanında Kullanım
Task’ları Django view’larında nasıl kullanacağımıza bakalım:
# notifications/views.py
from django.contrib.auth import login
from django.http import JsonResponse
from .tasks import send_welcome_email, send_bulk_email
from celery import group
def register_user(request):
# ... kayıt işlemi ...
user = create_user(request.POST)
login(request, user)
# .delay() ile görevi kuyruğa ekle, anında döner
send_welcome_email.delay(user.id)
# Alternatif: .apply_async() daha fazla kontrol sağlar
# send_welcome_email.apply_async(
# args=[user.id],
# countdown=10, # 10 saniye sonra çalıştır
# expires=3600, # 1 saat sonra geçersiz say
# queue='emails', # Belirli bir kuyruğa gönder
# )
return JsonResponse({'message': 'Kayıt başarılı, hoş geldin maili gönderildi.'})
def send_newsletter(request):
"""
Bülten gönderimi: 10.000 kişiye paralel task'larla gönderim
"""
subscribers = list(
Newsletter.objects.filter(active=True).values_list('email', flat=True)
)
# Büyük listeyi küçük parçalara böl ve grup olarak çalıştır
BATCH_SIZE = 500
tasks = []
for i in range(0, len(subscribers), BATCH_SIZE):
batch = subscribers[i:i + BATCH_SIZE]
tasks.append(
send_bulk_email.s(
recipient_list=batch,
subject='Haftalık Bültenimiz',
template_name='emails/newsletter.html',
context={'month': 'Ocak', 'year': 2025}
)
)
# Tüm task'ları paralel çalıştır
job = group(tasks)
result = job.apply_async()
return JsonResponse({
'message': f'{len(subscribers)} kişiye bülten kuyruğa alındı.',
'group_id': result.id
})
Celery Worker’ı Çalıştırma ve Yönetimi
Geliştirme ortamında worker başlatmak:
# Temel worker başlatma
celery -A myproject worker --loglevel=info
# Belirli kuyrukları dinleyen worker
celery -A myproject worker --loglevel=info -Q emails,default
# Birden fazla worker process (concurrency)
celery -A myproject worker --loglevel=info --concurrency=4
# E-posta worker'ı için ayrı bir kuyruk tanımla
celery -A myproject worker --loglevel=info -Q emails --concurrency=2 -n email_worker@%h
# Flower ile monitoring (tarayıcıdan takip)
pip install flower
celery -A myproject flower --port=5555
Production ortamında Celery’yi systemd ile çalıştırmak:
# /etc/systemd/system/celery.service
[Unit]
Description=Celery Service
After=network.target redis.service
[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/myproject
EnvironmentFile=/var/www/myproject/.env
ExecStart=/bin/sh -c '${VENV_PATH}/bin/celery multi start worker
-A myproject
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO
--concurrency=4
-Q emails,default'
ExecStop=/bin/sh -c '${VENV_PATH}/bin/celery multi stopwait worker
--pidfile=/var/run/celery/%n.pid'
ExecReload=/bin/sh -c '${VENV_PATH}/bin/celery multi restart worker
-A myproject
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO'
RuntimeDirectory=celery
RuntimeDirectoryMode=0755
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable celery
sudo systemctl start celery
sudo systemctl status celery
Gerçek Dünya Senaryosu: Sipariş Onay Maili
Bir e-ticaret sitesinde sipariş akışını ele alalım. Sipariş tamamlandığında hem müşteriye hem de satıcıya mail gitmelidir:
from celery import shared_task, chain, group
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(bind=True, max_retries=3, name='orders.tasks.notify_order_completion')
def notify_order_completion(self, order_id: int):
"""
Sipariş tamamlandığında birden fazla tarafı bilgilendirir.
Chain kullanarak sıralı, group kullanarak paralel çalıştırır.
"""
from orders.models import Order
try:
order = Order.objects.select_related('user', 'seller').get(pk=order_id)
# Müşteri ve satıcı maillerini paralel gönder
notification_group = group([
send_order_confirmation_to_buyer.s(order_id),
send_order_notification_to_seller.s(order_id),
])
# Paralel görev grubunu çalıştır
notification_group.apply_async()
logger.info(f"Sipariş bildirimleri kuyruğa alındı: order_id={order_id}")
return {'status': 'queued', 'order_id': order_id}
except Exception as exc:
raise self.retry(exc=exc, countdown=30)
@shared_task(bind=True, max_retries=3)
def send_order_confirmation_to_buyer(self, order_id: int):
from orders.models import Order
try:
order = Order.objects.select_related('user').prefetch_related('items').get(pk=order_id)
context = {
'order': order,
'user': order.user,
'items': order.items.all(),
'total': order.total_price,
'estimated_delivery': order.estimated_delivery_date,
}
html_content = render_to_string('emails/order_confirmation.html', context)
text_content = strip_tags(html_content)
msg = EmailMultiAlternatives(
subject=f'Siparişiniz Alındı #{order.order_number}',
body=text_content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=[order.user.email]
)
msg.attach_alternative(html_content, "text/html")
# PDF fatura oluştur ve ekle (örnek)
# invoice_pdf = generate_invoice_pdf(order)
# msg.attach(f'fatura-{order.order_number}.pdf', invoice_pdf, 'application/pdf')
msg.send()
logger.info(f"Alıcı onay maili gönderildi: {order.user.email}")
except Exception as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
Hata Yönetimi ve İzleme
Production’da en çok karşılaşılan sorunlardan biri başarısız task’ların takibi. Celery’nin task_failure sinyalini kullanarak merkezi hata takibi yapabiliriz:
# myproject/celery.py dosyasına ekle
from celery.signals import task_failure, task_retry, task_success
import logging
alert_logger = logging.getLogger('celery.alerts')
@task_failure.connect
def handle_task_failure(sender=None, task_id=None, exception=None,
args=None, kwargs=None, traceback=None, einfo=None, **kw):
"""
Herhangi bir task başarısız olduğunda tetiklenir.
Sentry, Slack veya e-posta ile alert gönderilebilir.
"""
alert_logger.error(
f"Task başarısız: {sender.name} | "
f"Task ID: {task_id} | "
f"Hata: {exception} | "
f"Args: {args}"
)
# Sentry entegrasyonu varsa
# sentry_sdk.capture_exception(exception)
@task_retry.connect
def handle_task_retry(sender=None, reason=None, **kw):
alert_logger.warning(
f"Task yeniden deneniyor: {sender.name} | Sebep: {reason}"
)
Celery Beat ile Zamanlanmış E-postalar
Bazı senaryolarda e-postaların belirli saatlerde gönderilmesi gerekir. Celery Beat bu iş için biçilmiş kaftan:
pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
...
'django_celery_beat',
]
# Zamanlanmış görevler
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'haftalik-bulten': {
'task': 'notifications.tasks.send_weekly_newsletter',
'schedule': crontab(hour=10, minute=0, day_of_week='monday'),
'args': (),
},
'gunluk-ozet': {
'task': 'notifications.tasks.send_daily_summary',
'schedule': crontab(hour=18, minute=30),
},
'inaktif-kullanici-maili': {
'task': 'notifications.tasks.remind_inactive_users',
'schedule': crontab(hour=9, minute=0, day_of_week='wednesday,friday'),
},
}
# Beat scheduler'ı başlat
celery -A myproject beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Performans İpuçları ve Dikkat Edilmesi Gerekenler
Yıllar içinde production’da öğrendiğim bazı kritik noktalar var.
Task parametrelerinde dikkat: Task’lara büyük objeler değil, ID’ler geçirin. send_welcome_email(user) yerine send_welcome_email(user.id) kullanın. Task parametreleri Redis’te serileştirilip saklanır, büyük objeler hem bellek hem de bant genişliği tüketir.
Idempotency: E-posta task’larını idempotent yazın. Aynı task iki kez çalışırsa kullanıcı iki kez mail almamalı. Bunu önlemek için veritabanında gönderim kaydı tutabilirsiniz.
Queue segmentasyonu: Kritik mailleri (şifre sıfırlama, sipariş onay) yüksek öncelikli kuyruğa, bültenleri düşük öncelikli kuyruğa koyun:
# Kritik mail - yüksek öncelikli kuyruk
send_password_reset_email.apply_async(args=[user_id], queue='critical_emails')
# Bülten - düşük öncelikli kuyruk
send_newsletter.apply_async(args=[batch], queue='bulk_emails')
Worker concurrency ayarı: E-posta gönderimi I/O bound bir işlem olduğundan, CPU sayısının üzerinde concurrency kullanabilirsiniz. Genel kural: CPU sayısı * 2 ile başlayın, monitöring yaparak artırın.
Dead Letter Queue: Tüm retry’ları tüketen task’ların nereye gittiğini takip edin. Redis’te bu task’lar kaybolur. Önemli mailleri (örn. sipariş onayı) aynı zamanda veritabanına da kaydedin, worker başarısız olsa bile manuel müdahale mümkün olsun.
Sonuç
Celery ile asenkron e-posta gönderimi kurmanın teknik olarak çok derin bir konu olmadığını görüyorsunuz. Asıl zor olan kısım, production ortamında güvenilir, izlenebilir ve bakımı kolay bir sistem kurmak. bind=True ile retry mekanizmaları, kuyruk segmentasyonu, systemd ile worker yönetimi ve merkezi hata takibi olmadan kurduğunuz sistem günün birinde sessizce çöker ve siz binlerce mailin neden gitmediğini anlayamazsınız.
Benim önerim: Küçük başlayın. Önce temel hoş geldin mailini asenkron yapın, sistemin nasıl davrandığını gözlemleyin. Flower’ı açın, task’ların nasıl aktığını izleyin. Sonra retry mekanizmalarını ekleyin, sonra queue segmentasyonunu. Her şeyi bir anda yapmaya çalışmak yerine kademeli olarak ilerlemek, sistemin her katmanını gerçekten anlamanızı sağlar.
Bir diğer kritik nokta: E-posta gönderimini asenkron yapmak sizi SMTP rate limit sorunlarından kurtarmaz. SMTP sağlayıcınızın dakika/saat başına izin verdiği gönderim limitlerini bilin ve task’larınızı buna göre tasarlayın. AWS SES, Mailgun veya SendGrid gibi servislerle çalışıyorsanız, SDK’larının kendi retry mekanizmalarını da Celery retry’ı ile birlikte değerlendirin, ikisi çakışmasın.
