Celery ile Zamanlanmış Görev (Periodic Task) Oluşturma

Üretim ortamında Celery kullanmaya başladığınızda, basit asenkron görevlerden sonra kaçınılmaz olarak şu soruyla karşılaşırsınız: “Bu görevi her gece çalıştırmak istiyorum, ne yapmalıyım?” İşte tam bu noktada Celery Beat devreye giriyor. Celery’nin zamanlama bileşeni olan Beat, cron’un Python dünyasındaki güçlü kardeşi olarak düşünülebilir. Ama sadece bir cron wrapper değil; dinamik görev tanımları, farklı zamanlama stratejileri ve merkezi yönetim gibi özellikleriyle çok daha fazlasını sunuyor.

Celery Beat Nedir ve Neden İhtiyaç Duyulur?

Klasik bir Linux cron job ile bir Python scriptini belirli aralıklarla çalıştırabilirsiniz, doğru. Ama bu yaklaşımın ciddi kısıtları var. Script her çalıştığında yeni bir Python process başlatılıyor, uygulama context’ini yeniden kurmanız gerekiyor, hata yönetimi zayıf, distributed sistemlerde ise birden fazla sunucuda aynı cron’un çalışması büyük bir baş ağrısı haline geliyor.

Celery Beat bu problemlerin tamamını çözüyor. Beat, bir scheduler process olarak çalışır ve görevleri doğru zamanda Celery worker’larına iletir. Worker’lar ise bu görevleri mevcut uygulama context’i içinde çalıştırır. Distributed mimaride tek bir Beat instance’ı yüzlerce worker’ı besleyebilir.

Tipik kullanım senaryoları şunlardır:

  • Raporlama görevleri: Her sabah 08:00’de satış raporu oluşturmak
  • Veri senkronizasyonu: Harici API’lerden saatlik veri çekmek
  • Temizlik işlemleri: Eski log kayıtlarını veya geçici dosyaları periyodik silmek
  • Health check: Sistemin belirli bileşenlerini düzenli kontrol etmek
  • E-posta bildirimleri: Haftalık özet mail göndermek

Kurulum ve Temel Yapılandırma

Önce gerekli paketleri kuralım. Broker olarak Redis kullanacağız çünkü Redis production ortamında daha yaygın ve yönetimi daha kolay:

pip install celery redis celery[redis]

# Beat için ek paket (veritabanı tabanlı zamanlama için)
pip install django-celery-beat  # Django kullananlar için
pip install celery[sqlalchemy]  # SQLAlchemy tabanlı backend için

Temel bir proje yapısı kuralım:

myproject/
├── celery_app.py
├── tasks.py
├── celeryconfig.py
└── requirements.txt

celery_app.py dosyasını oluşturalım:

from celery import Celery
from celery.schedules import crontab
import os

# Redis bağlantı ayarları
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')

app = Celery('myproject')

app.conf.update(
    broker_url=REDIS_URL,
    result_backend=REDIS_URL,
    timezone='Europe/Istanbul',
    enable_utc=True,

    # Beat zamanlama tanımları
    beat_schedule={
        'gunluk-rapor-gonder': {
            'task': 'tasks.gunluk_rapor_olustur',
            'schedule': crontab(hour=8, minute=0),  # Her sabah 08:00
            'args': (),
            'kwargs': {'format': 'pdf'},
        },
        'saatlik-veri-senkronizasyonu': {
            'task': 'tasks.harici_api_senkronize_et',
            'schedule': crontab(minute=0),  # Her saat başı
        },
        'besler-dakikada-bir': {
            'task': 'tasks.cache_yenile',
            'schedule': 60.0,  # Her 60 saniyede bir (saniye cinsinden float)
        },
    },
)

Zamanlama Türleri

Celery Beat’te üç temel zamanlama yöntemi var ve hangisini kullanacağınız senaryonuza göre değişiyor.

1. Sabit Aralıklı Zamanlama (timedelta / float)

En basit yöntem. Görevin her N saniyede, dakikada veya saatte bir çalışmasını istediğinizde kullanın:

from datetime import timedelta

beat_schedule = {
    'her-30-saniyede-bir': {
        'task': 'tasks.sistem_durumu_kontrol',
        'schedule': 30.0,  # saniye
    },
    'her-5-dakikada-bir': {
        'task': 'tasks.kuyruk_izle',
        'schedule': timedelta(minutes=5),
    },
    'her-iki-saatte-bir': {
        'task': 'tasks.gecici_dosya_temizle',
        'schedule': timedelta(hours=2),
    },
}

2. Crontab Zamanlama

Linux cron’a aşinaysanız, crontab sözdizimi size tanıdık gelecektir. Celery’nin crontab sınıfı oldukça esnek:

from celery.schedules import crontab

beat_schedule = {
    # Her gün 08:30'da
    'sabah-raporu': {
        'task': 'tasks.gunluk_rapor',
        'schedule': crontab(hour=8, minute=30),
    },

    # Her Pazartesi ve Perşembe 18:00'de
    'haftalik-ozet': {
        'task': 'tasks.haftalik_ozet_gonder',
        'schedule': crontab(hour=18, minute=0, day_of_week='monday,thursday'),
    },

    # Her ayın 1'i ve 15'inde 09:00'da
    'aylik-fatura': {
        'task': 'tasks.fatura_olustur',
        'schedule': crontab(hour=9, minute=0, day_of_month='1,15'),
    },

    # Her Cuma 23:59'da (hafta sonu yedeği için)
    'hafta-sonu-yedek': {
        'task': 'tasks.veritabani_yedekle',
        'schedule': crontab(hour=23, minute=59, day_of_week='friday'),
    },

    # Her gün gece yarısı
    'gece-yarisi-gorev': {
        'task': 'tasks.gun_sonu_islemleri',
        'schedule': crontab(hour=0, minute=0),
    },
}

crontab parametreleri:

  • minute: 0-59 arası dakika değeri, '*' tüm dakikalar
  • hour: 0-23 arası saat değeri
  • day_of_week: 0-6 arası (0=Pazar) veya isim olarak 'monday', 'tue'
  • day_of_month: 1-31 arası gün değeri
  • month_of_year: 1-12 arası ay değeri

3. Solar Zamanlama

Az bilinen ama çok kullanışlı bir özellik. Güneşin doğuş ve batış saatlerine göre görev zamanlayabilirsiniz. İstanbul koordinatlarıyla örnek:

from celery.schedules import solar

beat_schedule = {
    'gunes-dogusunda-calıs': {
        'task': 'tasks.sabah_bildirimi_gonder',
        'schedule': solar('sunrise', lat=41.0082, lon=28.9784),
    },
    'gunes-batisinda-calis': {
        'task': 'tasks.aksam_raporu_olustur',
        'schedule': solar('sunset', lat=41.0082, lon=28.9784),
    },
}

Gerçek Dünya Senaryosu: E-Ticaret Raporlama Sistemi

Somut bir örnek üzerinden gidelim. Bir e-ticaret platformunda çalıştığınızı varsayalım. Birden fazla periyodik görev tanımlamanız gerekiyor:

# tasks.py
from celery_app import app
import logging
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)

@app.task(bind=True, max_retries=3, default_retry_delay=300)
def gunluk_satis_raporu(self, tarih=None):
    """
    Her gün sabah 07:00'de bir önceki günün satış raporunu oluşturur
    ve yöneticilere e-posta gönderir.
    """
    try:
        if tarih is None:
            tarih = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')

        logger.info(f"Satış raporu oluşturuluyor: {tarih}")

        # Veritabanından satış verilerini çek
        satislar = satis_verisi_getir(tarih)

        # PDF rapor oluştur
        rapor_yolu = pdf_rapor_olustur(satislar, tarih)

        # E-posta gönder
        yoneticilere_mail_gonder(
            konu=f"Günlük Satış Raporu - {tarih}",
            ek=rapor_yolu
        )

        logger.info(f"Rapor başarıyla gönderildi: {tarih}")
        return {'durum': 'basarili', 'tarih': tarih}

    except Exception as exc:
        logger.error(f"Rapor oluşturma hatası: {exc}")
        raise self.retry(exc=exc)


@app.task(bind=True, max_retries=5, default_retry_delay=60)
def stok_uyari_kontrol(self):
    """
    Her 15 dakikada bir stok seviyelerini kontrol eder.
    Kritik seviyenin altındaki ürünler için uyarı gönderir.
    """
    try:
        kritik_stoklar = dusuk_stok_urunleri_getir(esik=10)

        if kritik_stoklar:
            for urun in kritik_stoklar:
                logger.warning(
                    f"Kritik stok uyarısı: {urun['ad']} - "
                    f"Kalan: {urun['miktar']}"
                )
            satin_alma_ekibine_bildir(kritik_stoklar)

        return {'kontrol_edilen': len(kritik_stoklar)}

    except Exception as exc:
        raise self.retry(exc=exc)


@app.task
def yorum_moderasyon_kuyrugu():
    """
    Her saat başı onay bekleyen yorumları AI moderasyon sistemine gönderir.
    """
    bekleyen_yorumlar = onay_bekleyen_yorumlar_getir()
    logger.info(f"{len(bekleyen_yorumlar)} yorum moderasyona gönderildi")

    for yorum in bekleyen_yorumlar:
        ai_moderasyon_gorevi.delay(yorum['id'])

    return len(bekleyen_yorumlar)

Şimdi bu görevlerin zamanlamasını tanımlayalım:

# celeryconfig.py
from celery.schedules import crontab
from datetime import timedelta

beat_schedule = {
    'gunluk-satis-raporu': {
        'task': 'tasks.gunluk_satis_raporu',
        'schedule': crontab(hour=7, minute=0),
        'options': {
            'expires': 3600,  # 1 saat içinde çalışmazsa iptal et
            'queue': 'raporlama',  # Özel kuyruk
        }
    },

    'stok-uyari-kontrol': {
        'task': 'tasks.stok_uyari_kontrol',
        'schedule': timedelta(minutes=15),
        'options': {
            'queue': 'izleme',
            'priority': 8,  # Yüksek öncelik
        }
    },

    'yorum-moderasyon': {
        'task': 'tasks.yorum_moderasyon_kuyrugu',
        'schedule': crontab(minute=0),  # Her saat başı
        'options': {
            'queue': 'moderasyon',
        }
    },
}

# Timezone ayarı kritik! Türkiye saatini kullanın
timezone = 'Europe/Istanbul'
enable_utc = True

# Beat'in zamanlama durumunu kaydettiği dosya
beat_schedule_filename = '/var/run/celery/celerybeat-schedule'

Beat’i Başlatmak ve Yönetmek

Beat process’ini başlatmak için:

# Temel başlatma
celery -A celery_app beat --loglevel=info

# Worker ve Beat'i aynı anda başlatmak (geliştirme ortamı için)
# UYARI: Production'da bunu yapmayın!
celery -A celery_app worker --beat --loglevel=info

# Özel zamanlama dosyası konumu belirtmek
celery -A celery_app beat 
    --loglevel=info 
    --schedule=/var/run/celery/celerybeat-schedule 
    --pidfile=/var/run/celery/celerybeat.pid

# Production için ayrı worker'larla başlatın
celery -A celery_app worker --loglevel=info -Q raporlama,izleme,moderasyon &
celery -A celery_app beat --loglevel=info &

Önemli bir production uyarısı: Beat’in birden fazla instance’ını aynı anda çalıştırmayın. Bu durumda aynı görev birden fazla kez tetiklenebilir. Distributed ortamda Beat için leader election mekanizması kullanın veya Beat’i sadece tek bir sunucuda çalıştırın.

Systemd ile Production Kurulumu

Gerçek bir production ortamında Beat ve Worker’ları systemd service olarak tanımlamanız gerekiyor:

# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker
After=network.target redis.service

[Service]
Type=forking
User=celery
Group=celery
WorkingDirectory=/opt/myproject
Environment="PATH=/opt/myproject/venv/bin"
ExecStart=/opt/myproject/venv/bin/celery 
    -A celery_app worker 
    --loglevel=info 
    --logfile=/var/log/celery/worker.log 
    --pidfile=/var/run/celery/worker.pid 
    -Q raporlama,izleme,moderasyon 
    --concurrency=4
ExecStop=/bin/kill -s TERM $MAINPID
ExecReload=/bin/kill -s HUP $MAINPID
Restart=always
RestartSec=10s

[Install]
WantedBy=multi-user.target
# /etc/systemd/system/celery-beat.service
[Unit]
Description=Celery Beat Scheduler
After=network.target redis.service celery-worker.service

[Service]
Type=simple
User=celery
Group=celery
WorkingDirectory=/opt/myproject
Environment="PATH=/opt/myproject/venv/bin"
ExecStart=/opt/myproject/venv/bin/celery 
    -A celery_app beat 
    --loglevel=info 
    --logfile=/var/log/celery/beat.log 
    --pidfile=/var/run/celery/beat.pid 
    --schedule=/var/run/celery/celerybeat-schedule
Restart=always
RestartSec=10s

[Install]
WantedBy=multi-user.target

Servisleri etkinleştirin:

sudo systemctl daemon-reload
sudo systemctl enable celery-worker celery-beat
sudo systemctl start celery-worker celery-beat

# Durumu kontrol edin
sudo systemctl status celery-beat
sudo journalctl -u celery-beat -f

Dinamik Görev Ekleme: django-celery-beat

Statik yapılandırma dosyası yerine, görevleri çalışma zamanında veritabanından yönetmek isteyebilirsiniz. Django kullananlar için django-celery-beat bu işi kolaylaştırıyor:

pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
    ...
    'django_celery_beat',
]

CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
python manage.py migrate

Artık görevleri Django admin panelinden veya Python kodu üzerinden dinamik olarak ekleyebilirsiniz:

from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json

# Crontab zamanlaması oluştur
zamanlama, olusturuldu = CrontabSchedule.objects.get_or_create(
    hour='9',
    minute='30',
    day_of_week='monday',
    day_of_month='*',
    month_of_year='*',
    timezone='Europe/Istanbul'
)

# Periyodik görevi kaydet
PeriodicTask.objects.update_or_create(
    name='Pazartesi sabah raporu',
    defaults={
        'crontab': zamanlama,
        'task': 'tasks.haftalik_rapor_gonder',
        'kwargs': json.dumps({'format': 'xlsx', 'alicilar': ['[email protected]']}),
        'enabled': True,
    }
)

Bu yaklaşımın güzelliği, uygulamayı yeniden başlatmadan görev zamanlamalarını değiştirebilmeniz. Özellikle müşterinin kendi rapor zamanlamasını belirleyebildiği SaaS uygulamalarında bu çok kritik bir özellik.

Sık Yapılan Hatalar ve Çözümleri

Timezone karışıklığı: Beat’i UTC ile çalıştırıp crontab’ı İstanbul saatiyle yazdığınızda görevler 3 saat gecikmeli çalışır. Çözüm: timezone = 'Europe/Istanbul' ayarını yapın ve tutarlı kalın.

Çift çalışma problemi: Birden fazla sunucuda Beat başlatmak görevlerin çift tetiklenmesine yol açar. Bunu önlemek için redbeat gibi Redis tabanlı distributed lock mekanizmaları kullanabilirsiniz:

pip install redbeat
# celeryconfig.py
CELERY_BEAT_SCHEDULER = 'redbeat.RedBeatScheduler'
REDBEAT_REDIS_URL = 'redis://localhost:6379/1'
REDBEAT_LOCK_TIMEOUT = 5 * 60  # 5 dakika lock timeout

Beat process çöktüğünde görevler çalışmıyor: Systemd Restart=always ile bu sorun büyük ölçüde çözülüyor. Ama yine de kritik görevler için monitoring ekleyin. Celery Flower veya Prometheus/Grafana kombinasyonu bu iş için idealdir.

Uzun süren görevler Beat kuyruğunu tıkıyor: Eğer bir görev bir sonraki tetiklenme zamanından uzun sürüyorsa, aynı görevin birden fazla instance’ı çakışabilir. Bunu önlemek için:

@app.task(bind=True)
def uzun_sure_calisacak_gorev(self):
    # Bu görevin aynı anda sadece bir instance'ı çalışsın
    if not edinmek_kilidi('uzun_gorev', timeout=3600):
        logger.info("Görev zaten çalışıyor, atlanıyor")
        return

    try:
        # Görev kodu buraya
        pass
    finally:
        kilidi_birak('uzun_gorev')

Görev İzleme ve Logging

Production ortamında görevlerin ne zaman çalıştığını, ne kadar sürdüğünü ve hata verip vermediğini takip etmek şart:

# Celery Flower ile web arayüzü
pip install flower

celery -A celery_app flower 
    --port=5555 
    --address=0.0.0.0 
    --basic_auth=admin:guvenli_sifre

# Beat loglarını gerçek zamanlı takip edin
tail -f /var/log/celery/beat.log | grep -E "(ERROR|WARNING|INFO)"

# Redis üzerindeki kuyruk durumunu kontrol edin
redis-cli llen celery  # Varsayılan kuyruktaki görev sayısı

Görevlerinize yapılandırılmış logging ekleyin:

import structlog

log = structlog.get_logger()

@app.task(bind=True)
def izlenen_gorev(self):
    log.info(
        "gorev_basladi",
        task_id=self.request.id,
        gorev_adi=self.name,
        zaman=datetime.now().isoformat()
    )

    baslangic = time.time()
    # Görev kodu...
    sure = time.time() - baslangic

    log.info(
        "gorev_tamamlandi",
        task_id=self.request.id,
        sure_saniye=round(sure, 2)
    )

Sonuç

Celery Beat, doğru yapılandırıldığında gerçekten güçlü bir araç. Ama bu güç beraberinde sorumluluk getiriyor. Timezone’lara dikkat edin, Beat’in tek instance çalışmasını garantileyin, kritik görevler için idempotency sağlayın ve her şeyi izleyin.

En çok gördüğüm sorun, geliştiricilerin Beat’i kurup sonra unutması. Bir görev hata veriyor ama kimse fark etmiyor çünkü monitoring yok. Production’a atmadan önce Flower’ı veya en azından temel bir alert mekanizmasını kurun.

Django kullananlar için django-celery-beat kesinlikle tercih edilmeli; statik config dosyası yerine veritabanı tabanlı dinamik zamanlama, özellikle büyüyen projelerde hayat kurtarıyor. Microservice mimarisinde ise RedBeat ile distributed scheduling kurgusu daha sağlıklı sonuçlar veriyor.

Son olarak şunu söyleyeyim: Celery Beat, cron’un yerini tam olarak alabilir mi? Birçok senaryoda evet. Ama her şeyi Celery’ye taşımak zorunda da değilsiniz. Basit bir log rotation için cron yeterli, ama uygulama context’i gerektiren, retry mekanizması isteyen ve merkezi yönetim arayan görevler için Celery Beat açık ara en iyi seçenek.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir