Celery ile Görev Önceliklendirme ve Rate Limiting

Prodüksiyonda bir e-ticaret platformu yönetiyorsunuz ve sabah 09:00’da siparişlerin yoğunlaştığı saatte sisteminiz çöküyor. Log’lara bakıyorsunuz: Celery worker’larınız raporlama görevleri yüzünden tıkanmış, kritik sipariş bildirimleri kuyruğun sonunda bekliyor. Bu tam olarak önceliklendirme ve rate limiting’i ciddiye almadığınızda başınıza gelen şey.

Bu yazıda Celery’de görev önceliklendirmeyi ve rate limiting’i gerçek üretim senaryoları üzerinden ele alacağız. “Celery’de priority nasıl set edilir” gibi yüzeysel bilgilerin ötesine geçerek, neden bazı yaklaşımların production’da işe yaramadığını ve neyin gerçekten çalıştığını konuşacağız.

Celery’de Önceliklendirme: Broker Bağımlılığı

İlk dikkat etmeniz gereken şu: Celery’nin önceliklendirme desteği broker’ınıza doğrudan bağlı. RabbitMQ ve Redis farklı davranır. RabbitMQ AMQP protokolü üzerinden gerçek önceliklendirme desteği sunarken, Redis’te bu biraz farklı çalışır.

RabbitMQ ile önceliklendirme:

RabbitMQ’da priority queue tanımlamak için queue’yu özel argümanlarla oluşturmanız gerekiyor:

# celery_config.py
from kombu import Queue

CELERY_QUEUES = (
    Queue(
        'critical',
        queue_arguments={'x-max-priority': 10}
    ),
    Queue(
        'default',
        queue_arguments={'x-max-priority': 5}
    ),
    Queue(
        'low',
        queue_arguments={'x-max-priority': 3}
    ),
)

CELERY_DEFAULT_QUEUE = 'default'
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

CELERY_WORKER_PREFETCH_MULTIPLIER = 1 burada kritik. Eğer bunu 1’den büyük bırakırsanız, worker görevleri önceden çekip buffer’a alıyor ve önceliklendirmeniz anlamını yitiriyor. Worker zaten 4 görevi çekmiş, sonradan gelen yüksek öncelikli görev sıraya giriyor ama başta bekleyen 4 görevi atlayamıyor.

# tasks.py
from celery import Celery

app = Celery('myapp')

@app.task(bind=True, max_retries=3)
def process_order_notification(self, order_id, user_id):
    """Kritik: Sipariş bildirimleri gecikemez."""
    try:
        send_order_email(order_id, user_id)
        update_order_status(order_id, 'notified')
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

@app.task
def generate_monthly_report(report_type, date_range):
    """Düşük öncelik: Rapor üretimi saatlerce sürebilir."""
    return build_report(report_type, date_range)

Görevleri gönderirken öncelik belirtmek:

# Yüksek öncelikli gönderim
process_order_notification.apply_async(
    args=[order_id, user_id],
    queue='critical',
    priority=9
)

# Düşük öncelikli rapor görevi
generate_monthly_report.apply_async(
    args=['monthly_sales', '2024-01'],
    queue='low',
    priority=1,
    countdown=300  # 5 dakika sonra başlasın
)

Redis ile durum biraz farklı:

Redis broker kullanıyorsanız, Celery’nin priority desteği redis-py kütüphanesinin sorted set yapısı üzerinden çalışıyor ve davranış RabbitMQ kadar belirleyici değil. Buna rağmen çoğu ekip Redis kullanmaya devam ediyor çünkü altyapısı zaten Redis üzerine kurulu.

Redis ile en pragmatik yaklaşım: Ayrı queue’lar, ayrı worker’lar.

# Kritik görevler için ayrı worker grubu başlatma
celery -A myapp worker 
    --queues=critical 
    --concurrency=8 
    --hostname=critical-worker@%h 
    --loglevel=info

# Normal görevler
celery -A myapp worker 
    --queues=default,low 
    --concurrency=4 
    --hostname=default-worker@%h

Bu yaklaşım hem Redis hem de RabbitMQ’da tutarlı çalışır. Critical queue’ya ayrılmış worker’lar her zaman kritik görevleri işler, normal worker’lar meşgul olsa bile.

Görev Yönlendirme: Router Kullanımı

Her apply_async çağrısında queue belirtmek hem zahmetli hem de hata yapmaya açık. Task router kullanarak bunu otomatize edebilirsiniz:

# routers.py
class TaskPriorityRouter:
    
    CRITICAL_TASKS = {
        'tasks.process_order_notification',
        'tasks.process_payment',
        'tasks.send_otp',
    }
    
    LOW_PRIORITY_TASKS = {
        'tasks.generate_monthly_report',
        'tasks.cleanup_old_sessions',
        'tasks.sync_analytics_data',
    }
    
    def route_for_task(self, task, args=None, kwargs=None, options=None,
                       task_=None, **kw):
        if task in self.CRITICAL_TASKS:
            return {
                'queue': 'critical',
                'priority': 9
            }
        elif task in self.LOW_PRIORITY_TASKS:
            return {
                'queue': 'low',
                'priority': 1
            }
        return {
            'queue': 'default',
            'priority': 5
        }
# celery_config.py içinde router'ı aktif etmek
CELERY_TASK_ROUTES = ('routers.TaskPriorityRouter',)

Router tanımlandıktan sonra görevlerinizi normal şekilde çağırabilirsiniz, routing otomatik yapılacak:

# Artık queue belirtmek zorunda değilsiniz
process_order_notification.delay(order_id, user_id)
generate_monthly_report.delay('monthly_sales', '2024-01')

Tabii özel durumlarda hala manuel override yapabilirsiniz. Router, apply_async içinde belirttiğiniz değerleri ezmez.

Rate Limiting: Sistemi Kendi Görevlerinizden Korumak

Rate limiting genellikle dışarıya çağrı yaparken (API, SMS servisi vb.) düşünülen bir şey ama aslında iç sistemleri de korumak için kritik.

Celery’nin built-in rate limiting mekanizması task başına tanımlanır:

@app.task(rate_limit='100/m')
def send_email_notification(user_id, template_id, context):
    """
    Dakikada maksimum 100 email gönderilsin.
    Email servisimiz 100/dk limitine sahip.
    """
    email_service.send(
        user_id=user_id,
        template=template_id,
        context=context
    )

@app.task(rate_limit='10/s')
def call_payment_api(transaction_id, amount):
    """Saniyede 10 çağrı - payment gateway limiti."""
    return payment_gateway.charge(transaction_id, amount)

@app.task(rate_limit='5/h')
def generate_heavy_report(report_id):
    """Saatte 5 ağır rapor - veritabanı yük limiti."""
    return report_engine.generate(report_id)

Rate limit formatları şunlardır:

  • 100/s: Saniyede 100 görev
  • 100/m: Dakikada 100 görev
  • 100/h: Saatte 100 görev

Rate limiting’i runtime’da değiştirmek de mümkün:

# Programatik olarak rate limit değiştirme
from celery.app.control import Control

control = Control(app)

# Tüm worker'larda email görevinin limitini düşür
control.rate_limit(
    'tasks.send_email_notification',
    '50/m',
    destination=['critical-worker@hostname']
)

Bu özellik özellikle gece maintenance pencerelerinde veya yoğun dönemlerde sistemi throttle etmek için kullanışlı.

Gerçek Dünya Senaryosu: E-ticaret Kara Cuma

Bir e-ticaret platformunda Kara Cuma sırasında yaşanan bir problemi ele alalım. Sistem normal günde sorunsuz çalışıyor, ama trafik 10 katına çıkınca her şey içler acısı bir hal alıyor.

Problem analizi: Tüm görevler tek queue’da, worker’lar raporlama görevleriyle meşgul, sipariş bildirimleri 30 dakika gecikmeli gidiyor.

Çözüm için önce görev kategorilerini belirlemek lazım:

Tier 1 – Kritik (milisaniyelik gecikme tolere edilemez):

  • Ödeme işlemi tamamlama bildirimi
  • OTP gönderimi
  • Stok rezervasyon bildirimi

Tier 2 – Normal (birkaç dakika tolere edilebilir):

  • Sipariş onay emaili
  • Kargo bildirimi
  • Stok güncelleme

Tier 3 – Düşük Öncelik (saatler içinde tamamlanabilir):

  • Satış raporları
  • Analytics senkronizasyonu
  • Toplu email kampanyaları
# celery_app.py - Production konfigürasyonu
from celery import Celery
from kombu import Queue, Exchange

app = Celery('ecommerce')

app.config_from_object({
    'BROKER_URL': 'amqp://celery:password@rabbitmq:5672//',
    'CELERY_RESULT_BACKEND': 'redis://redis:6379/1',
    
    'CELERY_QUEUES': (
        Queue('tier1', Exchange('tier1'), routing_key='tier1',
              queue_arguments={'x-max-priority': 10}),
        Queue('tier2', Exchange('tier2'), routing_key='tier2',
              queue_arguments={'x-max-priority': 5}),
        Queue('tier3', Exchange('tier3'), routing_key='tier3',
              queue_arguments={'x-max-priority': 2}),
    ),
    
    'CELERY_DEFAULT_QUEUE': 'tier2',
    'CELERY_WORKER_PREFETCH_MULTIPLIER': 1,
    'CELERY_TASK_ACKS_LATE': True,
    
    # Görev zaman aşımı limitleri
    'CELERY_TASK_SOFT_TIME_LIMIT': 300,
    'CELERY_TASK_TIME_LIMIT': 600,
})

Docker Compose ile çoklu worker stratejisi:

# docker-compose.yml'den ilgili servis tanımları

# Tier1 worker - 8 concurrent, sadece kritik görevler
celery -A ecommerce worker 
  --queues=tier1 
  --concurrency=8 
  --prefetch-multiplier=1 
  --hostname=tier1-worker@%h

# Tier2 worker - 4 concurrent, normal görevler
celery -A ecommerce worker 
  --queues=tier2 
  --concurrency=4 
  --hostname=tier2-worker@%h

# Tier3 worker - 2 concurrent, düşük öncelikli (geceleri daha fazla)
celery -A ecommerce worker 
  --queues=tier3 
  --concurrency=2 
  --hostname=tier3-worker@%h

Dinamik Rate Limiting: Adaptive Throttling

Sabit rate limit değerleri her zaman yeterli olmaz. Özellikle dış servis sağlayıcıların dinamik limit uyguladığı durumlarda adaptive bir yaklaşım gerekiyor.

# adaptive_rate_limiter.py
import redis
from functools import wraps
from celery import Task

redis_client = redis.Redis(host='redis', port=6379, db=2)

class AdaptiveRateLimitTask(Task):
    """
    Dış servis hatalarını izleyip rate limiti dinamik ayarlayan task base class.
    """
    abstract = True
    _rate_limit_key = None
    _error_threshold = 5
    _throttle_duration = 60
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        if self._rate_limit_key and '429' in str(exc):
            # Rate limit hatası aldık, throttle'a gir
            error_count = redis_client.incr(
                f'rate_errors:{self._rate_limit_key}'
            )
            redis_client.expire(
                f'rate_errors:{self._rate_limit_key}',
                self._throttle_duration
            )
            
            if error_count >= self._error_threshold:
                # Tüm worker'larda bu task'ın rate limitini düşür
                new_limit = self._calculate_reduced_limit(error_count)
                self.app.control.rate_limit(
                    self.name,
                    new_limit
                )
    
    def _calculate_reduced_limit(self, error_count):
        base_limit = getattr(self, 'rate_limit', '100/m')
        value, period = base_limit.split('/')
        reduced = max(int(value) // 2, 10)
        return f'{reduced}/{period}'


@app.task(
    base=AdaptiveRateLimitTask,
    rate_limit='200/m',
    bind=True,
    _rate_limit_key='sms_api',
    max_retries=3
)
def send_sms_notification(self, phone, message, otp_code=None):
    try:
        result = sms_provider.send(phone, message)
        return result
    except RateLimitException as exc:
        # Exponential backoff ile retry
        raise self.retry(
            exc=exc,
            countdown=min(2 ** self.request.retries * 30, 300)
        )

Monitoring: Neyin Nerede Beklendiğini Bilmek

Tüm bu konfigürasyonların üretimde düzgün çalışıp çalışmadığını anlamak için monitoring şart.

Flower kurulumu ve özelleştirilmiş izleme:

# Flower'ı başlatma
celery -A myapp flower 
    --port=5555 
    --broker=amqp://celery:password@rabbitmq:5672// 
    --url_prefix=flower 
    --basic_auth=admin:secretpassword 
    --max_tasks=10000

Programatik monitoring için inspect API:

# monitoring.py
from celery.app.control import Inspect
import json

def get_queue_health_report(app):
    """
    Tüm worker'ların durumunu ve queue metriklerini döner.
    Alert sistemine beslemek için kullanılır.
    """
    inspector = app.control.inspect(timeout=5)
    
    report = {
        'active_tasks': {},
        'reserved_tasks': {},
        'queue_lengths': {},
        'worker_stats': {}
    }
    
    # Aktif görevler
    active = inspector.active()
    if active:
        for worker, tasks in active.items():
            report['active_tasks'][worker] = len(tasks)
    
    # Worker istatistikleri
    stats = inspector.stats()
    if stats:
        for worker, stat in stats.items():
            report['worker_stats'][worker] = {
                'total_tasks': stat.get('total', {}),
                'pool_size': stat.get('pool', {}).get('max-concurrency', 0),
            }
    
    return report

def check_queue_depth_alert(app, threshold=500):
    """
    Queue derinliği threshold'u aşarsa uyarı döner.
    Prometheus veya Grafana'ya entegre edilebilir.
    """
    report = get_queue_health_report(app)
    alerts = []
    
    for queue, length in report['queue_lengths'].items():
        if length > threshold:
            alerts.append({
                'queue': queue,
                'depth': length,
                'severity': 'critical' if length > threshold * 2 else 'warning'
            })
    
    return alerts

Yaygın Hatalar ve Kaçınma Yolları

Prodüksiyonda sık karşılaşılan sorunları ve çözümlerini listeleyelim:

Hata: Priority queue’yu prefetch_multiplier ile bozmak

Worker’ınız 4 görevi önceden çekip buffer’a aldıysa, sonradan gelen yüksek öncelikli görev bunların arkasında bekler. Çözüm: CELERY_WORKER_PREFETCH_MULTIPLIER = 1.

Hata: Rate limiting’i sadece task decorator’da tanımlamak

Task level rate limit, worker başına uygulanır. 5 worker’ınız varsa ve rate_limit='100/m' diyorsanız, sistemdeki toplam hız 500/m olabilir. Merkezi bir rate limiting mekanizması düşünün.

Hata: Task zaman aşımı ayarlamamak

Rate limited bir görev yüzünden tıkanan worker’lar diğer görevleri bloklar. Her göreve mutlaka soft_time_limit ve time_limit ekleyin:

@app.task(
    rate_limit='50/m',
    soft_time_limit=120,  # 2 dakika sonra SoftTimeLimitExceeded
    time_limit=180,       # 3 dakika sonra zorla öldür
    acks_late=True
)
def process_large_file(file_path):
    try:
        return file_processor.process(file_path)
    except SoftTimeLimitExceeded:
        # Temizlik yap ve çık
        file_processor.cleanup(file_path)
        raise

Hata: Tüm görevleri critical queue’ya koymak

“Her şey kritik” anlayışı önceliklendirmeyi anlamsız kılar. Gerçekten kritik olanı belirleyin. Bir rapor saatlik gecikirse iş durur mu? Durmuyor. Sipariş bildirimi gecikirse? Müşteri şikayet eder.

Sonuç

Celery’de önceliklendirme ve rate limiting aslında bir konfigürasyon meselesi değil, bir mimari tasarım meselesi. İşin teknik kısmı nispeten basit: queue’lar oluşturuyorsunuz, worker’ları yönlendiriyorsunuz, limitler koyuyorsunuz. Asıl zorluk sistemdeki hangi görevin ne kadar kritik olduğunu doğru analiz etmek.

Prodüksiyona gitmeden önce kendinize şu soruları sorun: Bu görev gecikmesi kullanıcıyı doğrudan etkiliyor mu? Dış servislerin limitleri nedir ve bunları hangi senaryolarda aşabilirim? Yük altında hangi görevler feda edilebilir?

Başlangıç için önerdiğim yaklaşım şu: Üç queue, üç worker grubu, prefetch_multiplier=1, acks_late=True. Bu kombinasyon zaten büyük çoğunluğunun ihtiyacını karşılar. Sonrasında monitoring kurarak gerçek darboğazları görün ve ona göre fine-tune edin. Gözlemlemeden optimizasyon, haritasız yol yapmak gibidir.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir