Celery Canvas ile Kompleks İş Akışı Tasarımı
Celery’yi ilk öğrendiğinizde büyük ihtimalle delay() veya apply_async() ile basit görevler çalıştırdınız. Bu yöntemler çoğu senaryo için yeterli, ama gerçek dünyada işler nadiren bu kadar basit seyreder. E-ticaret siparişi işleme, veri boru hattı yönetimi, toplu bildirim sistemleri… Bunların hepsinde birden fazla görevin belirli bir sıra veya koşula göre çalışması gerekir. İşte tam bu noktada Celery Canvas devreye girer ve oyunun kurallarını değiştirir.
Celery Canvas Nedir ve Neden Önemlidir
Canvas, Celery’nin görevleri birbirine bağlamak için sunduğu bir abstraksiyon katmanıdır. Tek bir görev çalıştırmak yerine, görev grafikleri (task graphs) oluşturmanıza olanak tanır. Birkaç ilkel yapıdan oluşur: chain, group, chord, map, starmap ve chunks. Bu yapıları bir araya getirerek oldukça karmaşık iş akışları modelleyebilirsiniz.
Peki neden bu kadar önemli? Çünkü alternatif yol, görevlerin içinde manuel olarak diğer görevleri çağırmaktır. Bu yaklaşım kısa vadede çalışır ama bakımı zorlaşır, hata yönetimi kabuğa döner ve görev bağımlılıkları kod içine gömülür. Canvas bu bağımlılıkları açık ve tanımlanmış hale getirir.
Önce ortamı hazırlayalım:
pip install celery redis
# ya da mevcut projenize eklemek için:
pip install "celery[redis]>=5.3.0"
Temel konfigürasyon:
# celery_app.py
from celery import Celery
app = Celery(
'canvas_demo',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Europe/Istanbul',
enable_utc=True,
task_track_started=True,
result_expires=3600,
)
Chain: Sıralı Görev Zincirleri
chain, bir görevin çıktısının bir sonraki görevin girişi olduğu ardışık akışları tanımlar. Unix pipe mantığıyla düşünebilirsiniz.
Gerçek senaryo olarak bir görüntü işleme hattı düşünelim. Kullanıcı bir fotoğraf yüklüyor; önce boyutlandırılacak, sonra watermark eklenecek, ardından CDN’e yüklenecek ve son olarak veritabanı kaydı güncellenecek:
# tasks/image_pipeline.py
from celery import shared_task, chain
from celery_app import app
@app.task(bind=True, max_retries=3)
def resize_image(self, image_path: str, target_size: tuple) -> dict:
"""Görüntüyü belirtilen boyuta indirgeer."""
try:
# PIL/Pillow işlemleri burada
from PIL import Image
img = Image.open(image_path)
img_resized = img.resize(target_size, Image.LANCZOS)
output_path = image_path.replace('.jpg', '_resized.jpg')
img_resized.save(output_path, optimize=True, quality=85)
return {
'original_path': image_path,
'resized_path': output_path,
'size': target_size
}
except Exception as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task(bind=True)
def add_watermark(self, image_data: dict) -> dict:
"""Resized görüntüye watermark ekler."""
resized_path = image_data['resized_path']
# Watermark işlemleri...
watermarked_path = resized_path.replace('_resized', '_watermarked')
image_data['watermarked_path'] = watermarked_path
return image_data
@app.task(bind=True, max_retries=5)
def upload_to_cdn(self, image_data: dict) -> dict:
"""CDN'e yükleme yapar."""
# boto3 veya benzeri CDN SDK kullanımı
cdn_url = f"https://cdn.example.com/images/{image_data['watermarked_path'].split('/')[-1]}"
image_data['cdn_url'] = cdn_url
return image_data
@app.task
def update_database_record(image_data: dict, user_id: int) -> bool:
"""Veritabanındaki kaydı günceller."""
# Django ORM veya SQLAlchemy işlemleri
print(f"Kullanıcı {user_id} için CDN URL güncellendi: {image_data['cdn_url']}")
return True
# Chain kullanımı
def process_user_image(image_path: str, user_id: int):
workflow = chain(
resize_image.s(image_path, (800, 600)),
add_watermark.s(),
upload_to_cdn.s(),
update_database_record.s(user_id)
)
result = workflow.apply_async()
return result.id
Burada dikkat edilmesi gereken bir nokta: update_database_record görevi image_data ve user_id olmak üzere iki parametre alıyor. Chain’de önceki görevin çıktısı ilk parametre olarak otomatik geçirilir, user_id ise s() içinde ek argüman olarak tanımlanır. Bu pattern ilk bakışta kafa karıştırabilir, ama bir kez içselleştirdiğinizde çok güçlü bir araç haline gelir.
Group: Paralel Görev Grupları
group, birbirinden bağımsız görevlerin eş zamanlı çalıştırılmasını sağlar. Bağımsız API çağrıları, toplu e-posta gönderimi veya birden fazla dosyanın paralel işlenmesi gibi senaryolar için biçilmiş kaftan.
# tasks/notification.py
from celery import group
from celery_app import app
@app.task(bind=True, max_retries=3)
def send_email_notification(self, user_email: str, template: str, context: dict) -> dict:
"""E-posta bildirimi gönderir."""
try:
# SMTP veya SendGrid işlemleri
print(f"{user_email} adresine {template} şablonuyla e-posta gönderildi")
return {'channel': 'email', 'recipient': user_email, 'status': 'sent'}
except Exception as exc:
raise self.retry(exc=exc, countdown=5)
@app.task(bind=True, max_retries=2)
def send_sms_notification(self, phone: str, message: str) -> dict:
"""SMS bildirimi gönderir."""
try:
# Twilio veya Netgsm API
print(f"{phone} numarasına SMS gönderildi")
return {'channel': 'sms', 'recipient': phone, 'status': 'sent'}
except Exception as exc:
raise self.retry(exc=exc, countdown=10)
@app.task
def send_push_notification(user_id: int, title: str, body: str) -> dict:
"""Push bildirimi gönderir."""
# Firebase FCM veya APNs
return {'channel': 'push', 'recipient': user_id, 'status': 'sent'}
# Tüm kanalları paralel çalıştır
def notify_user_all_channels(user: dict, message_context: dict):
notification_group = group([
send_email_notification.s(
user['email'],
'order_confirmation',
message_context
),
send_sms_notification.s(
user['phone'],
f"Siparişiniz alındı: #{message_context['order_id']}"
),
send_push_notification.s(
user['id'],
"Sipariş Onayı",
f"#{message_context['order_id']} numaralı siparişiniz alındı."
),
])
result = notification_group.apply_async()
# GroupResult döner, tüm sonuçları bekleyebilirsiniz
return result
Chord: Group + Callback Kombinasyonu
chord benim en çok kullandığım Canvas primitive’i. Bir grup görev çalıştırırsınız ve hepsi tamamlandığında bir callback görevi tetiklenir. E-ticaret raporlamasından veri aggregation işlemlerine kadar her yerde işe yarar.
# tasks/reporting.py
from celery import chord, group
from celery_app import app
from typing import List
@app.task
def fetch_sales_data(region: str, date_range: dict) -> dict:
"""Belirli bir bölge için satış verisi çeker."""
# Veritabanı sorguları
return {
'region': region,
'total_sales': 150000, # Gerçekte DB'den gelecek
'order_count': 342,
'date_range': date_range
}
@app.task
def fetch_inventory_data(warehouse_id: int) -> dict:
"""Depo envanter bilgisini çeker."""
return {
'warehouse_id': warehouse_id,
'total_items': 8750,
'low_stock_items': 23,
}
@app.task
def fetch_customer_metrics(segment: str) -> dict:
"""Müşteri segment metriklerini çeker."""
return {
'segment': segment,
'active_customers': 1250,
'churn_rate': 0.034,
}
@app.task
def generate_consolidated_report(results: List[dict]) -> str:
"""Tüm veriler toplandıktan sonra konsolide rapor üretir."""
report = {
'generated_at': 'now',
'data_sources': len(results),
'sections': results
}
# PDF üretme veya e-posta gönderme
report_path = f"/reports/consolidated_{len(results)}_sections.pdf"
print(f"Rapor oluşturuldu: {report_path}")
print(f"İşlenen veri kaynağı sayısı: {len(results)}")
return report_path
def build_weekly_report(date_range: dict):
"""Haftalık rapor için chord yapısını kurar."""
data_tasks = group([
fetch_sales_data.s('istanbul', date_range),
fetch_sales_data.s('ankara', date_range),
fetch_sales_data.s('izmir', date_range),
fetch_inventory_data.s(1), # Ana depo
fetch_inventory_data.s(2), # İkinci depo
fetch_customer_metrics.s('premium'),
fetch_customer_metrics.s('standard'),
])
report_workflow = chord(data_tasks)(generate_consolidated_report.s())
return report_workflow
Chord kullanırken kritik bir nokta: result backend’in mutlaka konfigüre edilmiş olması gerekir. Backend olmadan chord callback’i asla tetiklenmez ve sessizce başarısız olur. Bunu production’da bir kez atlayıp saatlerce debug etmek zorunda kaldım; o günden sonra konfigürasyon checklist’imin ilk maddesi bu oldu.
Karmaşık İç İçe Yapılar: Chain + Chord + Group
Gerçek dünya senaryolarında bu primitive’leri bir arada kullanmak gerekir. Bir e-ticaret sipariş işleme akışını ele alalım:
# tasks/order_processing.py
from celery import chain, chord, group
from celery_app import app
@app.task(bind=True, max_retries=3)
def validate_order(self, order_id: int) -> dict:
"""Sipariş doğrulama: stok kontrolü, ödeme kontrolü, adres doğrulama."""
try:
# Doğrulama mantığı
return {
'order_id': order_id,
'validated': True,
'customer_id': 456,
'items': [
{'sku': 'PROD-001', 'qty': 2, 'warehouse': 1},
{'sku': 'PROD-002', 'qty': 1, 'warehouse': 2},
]
}
except Exception as exc:
raise self.retry(exc=exc, countdown=5)
@app.task
def reserve_stock(order_data: dict) -> dict:
"""Her ürün için stok rezervasyonu yapar."""
for item in order_data['items']:
print(f"Stok rezerve edildi: {item['sku']} x{item['qty']}")
order_data['stock_reserved'] = True
return order_data
@app.task(bind=True, max_retries=5)
def process_payment(self, order_data: dict) -> dict:
"""Ödeme işlemini gerçekleştirir."""
try:
# Ödeme gateway entegrasyonu
order_data['payment_status'] = 'completed'
order_data['transaction_id'] = 'TXN-789012'
return order_data
except Exception as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 10)
@app.task
def prepare_shipment_label(item: dict, order_id: int) -> dict:
"""Her depo için kargo etiketi hazırlar."""
return {
'warehouse': item['warehouse'],
'sku': item['sku'],
'label_url': f"/labels/{order_id}_{item['warehouse']}.pdf"
}
@app.task
def notify_warehouses(label_results: list) -> bool:
"""Tüm kargo etiketleri hazır olduğunda depolara bildirim gönderir."""
warehouses = set(r['warehouse'] for r in label_results)
for wh in warehouses:
print(f"Depo {wh} bilgilendirildi")
return True
@app.task
def send_order_confirmation(order_data: dict) -> None:
"""Müşteriye sipariş onay e-postası gönderir."""
print(f"Sipariş {order_data['order_id']} için onay e-postası gönderildi")
def process_order(order_id: int):
"""
Akış:
1. Sipariş doğrula
2. Stok rezerve et
3. Ödeme al
4. Paralel: kargo etiketlerini hazırla (her ürün için ayrı)
5. Tüm etiketler hazır olunca: depolara bildir
6. Müşteriye onay e-postası gönder
"""
def build_shipment_workflow(order_data):
"""Bu kısım dinamik olarak chord oluşturur."""
label_tasks = group([
prepare_shipment_label.s(item, order_id)
for item in order_data['items']
])
return chord(label_tasks)(notify_warehouses.s())
# Ana akış
workflow = chain(
validate_order.s(order_id),
reserve_stock.s(),
process_payment.s(),
send_order_confirmation.s(),
)
return workflow.apply_async()
Hata Yönetimi ve İzleme
Canvas iş akışlarında hata yönetimi ayrı bir dikkat gerektirir. Zincirde herhangi bir görev başarısız olursa sonraki görevler çalışmaz; bu istediğiniz davranış olabilir, ama her zaman değil.
# tasks/resilient_tasks.py
from celery import chain, group
from celery_app import app
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=30,
acks_late=True,
reject_on_worker_lost=True
)
def critical_data_sync(self, source_id: int) -> dict:
"""Kritik veri senkronizasyonu. Kayıp tolere edilemez."""
try:
logger.info(f"Senkronizasyon başladı: source_id={source_id}")
# Senkronizasyon mantığı
result = {'source_id': source_id, 'synced_records': 1500}
logger.info(f"Senkronizasyon tamamlandı: {result}")
return result
except ConnectionError as exc:
logger.warning(f"Bağlantı hatası, yeniden deneniyor: {exc}")
raise self.retry(exc=exc, countdown=60)
except Exception as exc:
logger.error(f"Kritik hata: {exc}", exc_info=True)
# Slack/PagerDuty bildirimi burada tetiklenebilir
raise
# Hata durumunda callback tanımlama
@app.task
def on_workflow_error(request, exc, traceback):
"""Görev başarısız olduğunda çağrılır."""
logger.error(
f"İş akışı hatası: task_id={request.id}, "
f"exception={exc}, "
f"traceback={traceback}"
)
# Alerting sistemine bildir
# link_error ile hata callback'i bağlama
def create_resilient_chain(data_ids: list):
tasks = [
critical_data_sync.s(data_id).set(
link_error=on_workflow_error.s()
)
for data_id in data_ids
]
return group(tasks).apply_async()
Görev Durumu İzleme
# monitoring/task_tracker.py
from celery.result import GroupResult, AsyncResult
from celery_app import app
import time
def monitor_workflow(result_id: str, poll_interval: float = 2.0):
"""İş akışı durumunu izler ve raporlar."""
result = AsyncResult(result_id, app=app)
status_map = {
'PENDING': 'Kuyrukta bekliyor',
'STARTED': 'Çalışıyor',
'SUCCESS': 'Tamamlandı',
'FAILURE': 'Başarısız',
'RETRY': 'Yeniden deneniyor',
'REVOKED': 'İptal edildi',
}
while not result.ready():
current_status = result.status
print(f"Durum: {status_map.get(current_status, current_status)}")
if result.info and isinstance(result.info, dict):
progress = result.info.get('progress', 0)
print(f"İlerleme: %{progress}")
time.sleep(poll_interval)
if result.successful():
print(f"Akış başarıyla tamamlandı. Sonuç: {result.result}")
else:
print(f"Akış başarısız oldu. Hata: {result.result}")
return result.result
# Worker başlatma komutu
# celery -A celery_app worker --loglevel=info --concurrency=8 -Q default,high_priority
Production’da Canvas Kullanırken Dikkat Edilmesi Gerekenler
Birkaç yıllık Canvas kullanımı sonrasında edindiğim deneyimlerden:
Chord ve backend zorunluluğu: Daha önce de vurguladım ama tekrarlamaya değer. Redis veya RabbitMQ backend olmadan chord’lar sessizce çalışmaz. CELERY_RESULT_BACKEND ayarını kontrol edin.
Zincir uzunluğuna dikkat edin: Çok uzun chain’ler, her görev geçişinde serialization/deserialization yükü oluşturur. 10-15 görevden uzun zincirler için alternatif mimari düşünün.
İmmutable signature kullanımı: Bir göreve önceki sonucu geçirmek istemiyorsanız si() (immutable signature) kullanın. Bu özellikle chain içinde bildirim görevleri gibi bağımsız işlemler için önemlidir:
from celery import chain
# send_alert önceki görevin çıktısını almaz, bağımsız çalışır
workflow = chain(
process_data.s(data_id),
transform_results.s(),
send_alert.si('İşlem tamamlandı'), # si() kullanımı
store_results.s(),
)
Canvas sonuçlarını temizleyin: Her görev sonucunun backend’de tutulması bellek kullanımını artırır. result_expires ayarını uygun şekilde yapılandırın ve chain(...).apply_async(expires=300) ile görev bazında expire süresi belirleyin.
Rate limiting: Dış API’lere yapılan çağrıları içeren group’larda mutlaka rate limiting uygulayın:
@app.task(rate_limit='100/m') # Dakikada en fazla 100 çağrı
def call_external_api(endpoint: str, payload: dict) -> dict:
pass
Sonuç
Celery Canvas, dağıtık sistemlerde iş akışı yönetimini temiz ve yönetilebilir hale getiren güçlü bir araç. chain, group ve chord üçlüsünü anladığınızda neredeyse her türlü görev bağımlılığını modelleyebilirsiniz.
Ancak bu gücün bir maliyeti var: doğru konfigürasyon ve operasyonel disiplin olmadan Canvas iş akışları debug edilmesi çok zor hale gelebilir. Flower gibi bir monitoring aracını mutlaka devreye alın, görev loglarını yapılandırılmış formatta tutun ve her iş akışı için end-to-end test yazın. Sonuçta bu görevler production’da sessizce çalışacak; onları görmezden geldiğinizde değil, bir şeyler ters gittiğinde fark edeceksiniz.
Bir sonraki adım olarak Celery Beat ile periyodik Canvas iş akışları oluşturmayı ve celery inspect komutuyla çalışan akışları gerçek zamanlı izlemeyi incelemenizi öneririm.
