Celery Beat ile Cron Benzeri Görev Zamanlama

Prodüksiyonda bir şeyin gece yarısı çalışmasını istiyorsunuz. Klasik çözüm: crontab’a bir satır ekle, unut git. Peki ya bu işin birden fazla sunucuda çalışması gerekiyorsa? Ya task başarısız olduğunda retry mekanizması istiyorsanız? Ya da hangi task’ın ne zaman çalıştığını merkezi bir yerden görmek istiyorsanız? İşte tam bu noktada Celery Beat devreye giriyor ve cron’un yetersiz kaldığı her yerde hayatınızı kurtarıyor.

Celery Beat Nedir, Neden Kullanmalısınız?

Celery Beat, Celery ekosisteminin zamanlama bileşenidir. Bir scheduler olarak çalışır ve tanımladığınız görevleri belirli aralıklarla Celery worker’larına iletir. Sistem cron’undan temel farkı şu: cron işi doğrudan çalıştırır, Beat ise görevi bir kuyruğa iter ve worker’lar bu görevi alıp işler.

Bu ayrım küçük gibi görünse de pratikte büyük fark yaratır. Göreviniz başarısız olduğunda retry edebilirsiniz. Görevlerin çalışma geçmişine bakabilirsiniz. Birden fazla worker aynı kuyruğu tüketebilir, yük dağılır. Ve en önemlisi, zamanlama mantığını uygulama kodunuzun içine alırsınız; sunucu değiştirdiğinizde crontab’ı unutup uygulama ayarlarını sıfırdan yapılandırmanız gerekmez.

Şunu da söyleyeyim: Beat, production ortamında tek instance olarak çalışmalıdır. Birden fazla Beat instance’ı aynı anda ayağa kaldırırsanız görevler mükerrer tetiklenir. Bu kritik bir detay ve kurulumda dikkat etmezseniz başınız ağrır.

Kurulum ve Temel Yapılandırma

Önce gerekli paketleri kuralım. Redis’i broker olarak kullanacağım, çünkü prodüksiyon ortamlarında en yaygın kombinasyon bu.

pip install celery redis celery[redis]
# Zamanlama veritabanı için django-celery-beat kullanacaksak
pip install django-celery-beat

Temel bir Celery uygulaması oluşturalım. Django kullanmayan saf Python projesi için:

# celery_app.py
from celery import Celery
from celery.schedules import crontab
import os

os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery('myproject')

app.config_from_object('celeryconfig')

app.conf.beat_schedule = {
    'gunluk-rapor-gonder': {
        'task': 'tasks.rapor_gonder',
        'schedule': crontab(hour=8, minute=0),  # Her gün sabah 08:00
        'args': ('gunluk',),
    },
    'her-5-dakikada-kontrol': {
        'task': 'tasks.sistem_kontrol',
        'schedule': 300.0,  # 300 saniye = 5 dakika
    },
    'haftalik-temizlik': {
        'task': 'tasks.eski_kayitlari_temizle',
        'schedule': crontab(hour=2, minute=0, day_of_week='monday'),
    },
}

app.conf.timezone = 'Europe/Istanbul'

celeryconfig.py dosyasını da hazırlayalım:

# celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

timezone = 'Europe/Istanbul'
enable_utc = True

# Beat zamanlama geçmişini kaydetmek için
beat_scheduler = 'celery.beat:PersistentScheduler'
beat_schedule_filename = '/var/run/celery/celerybeat-schedule'

Şimdi task’larımızı tanımlayalım:

# tasks.py
from celery_app import app
import logging

logger = logging.getLogger(__name__)

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def rapor_gonder(self, rapor_tipi):
    try:
        logger.info(f"{rapor_tipi} raporu oluşturuluyor...")
        # Rapor oluşturma mantığı burada
        # email_gonder(rapor_tipi)
        logger.info(f"{rapor_tipi} raporu başarıyla gönderildi")
    except Exception as exc:
        logger.error(f"Rapor gönderilemedi: {exc}")
        raise self.retry(exc=exc)

@app.task
def sistem_kontrol():
    # Disk kullanımı, bellek durumu gibi kontroller
    import shutil
    disk = shutil.disk_usage('/')
    kullanim_yuzdesi = (disk.used / disk.total) * 100
    if kullanim_yuzdesi > 85:
        logger.warning(f"Disk kullanımı kritik: %{kullanim_yuzdesi:.1f}")
    return {'disk_kullanim': kullanim_yuzdesi}

@app.task
def eski_kayitlari_temizle():
    from datetime import datetime, timedelta
    kesim_tarihi = datetime.now() - timedelta(days=30)
    logger.info(f"{kesim_tarihi} tarihinden önceki kayıtlar temizleniyor")
    # Veritabanı temizleme işlemleri

Beat’i Ayağa Kaldırmak

Worker ve Beat’i ayrı ayrı çalıştırın. Tek process’te çalıştırma opsiyonu var ama production için önerilmiyor:

# Worker'ı başlat
celery -A celery_app worker --loglevel=info --concurrency=4

# Beat'i ayrı terminalde başlat
celery -A celery_app beat --loglevel=info

# Geliştirme için ikisini tek komutla çalıştırmak isterseniz (production'da KULLANMAYIN)
celery -A celery_app worker --beat --loglevel=info

Crontab İfadeleri ile İleri Zamanlama

Celery’nin crontab yardımcısı, Linux cron sözdizimiyle birebir örtüşür ve oldukça güçlüdür. Gerçek dünya senaryolarından örnekler:

from celery.schedules import crontab

beat_schedule = {
    # Her sabah 06:30'da
    'sabah-yedek': {
        'task': 'tasks.yedek_al',
        'schedule': crontab(hour=6, minute=30),
    },
    
    # Her saat başı
    'saatlik-senkron': {
        'task': 'tasks.veri_senkron',
        'schedule': crontab(minute=0),
    },
    
    # Pazartesi ve Cuma 09:00'da
    'is-gunu-rapor': {
        'task': 'tasks.rapor_gonder',
        'schedule': crontab(hour=9, minute=0, day_of_week='monday,friday'),
    },
    
    # Her ayın 1. ve 15. günü gece 01:00'de
    'aylik-fatura': {
        'task': 'tasks.fatura_olustur',
        'schedule': crontab(hour=1, minute=0, day_of_month='1,15'),
    },
    
    # Her 15 dakikada bir, sadece mesai saatlerinde (9-18)
    'mesai-kontrol': {
        'task': 'tasks.mesai_kontrol',
        'schedule': crontab(minute='*/15', hour='9-18'),
    },
    
    # Her Ocak ayının ilk günü
    'yillik-arsiv': {
        'task': 'tasks.yil_sonu_arsiv',
        'schedule': crontab(hour=0, minute=0, day_of_month=1, month_of_year=1),
    },
}

Django ile Entegrasyon: django-celery-beat

Saf Python projelerinde zamanlama beat_schedule sözlüğünde tanımlanır ve değiştirmek için kod düzenlemeniz gerekir. Django projelerinde ise django-celery-beat paketi bu süreci veritabanına taşır ve zamanlama görevlerini Django admin panelinden yönetebilirsiniz.

# settings.py
INSTALLED_APPS = [
    ...
    'django_celery_beat',
]

# Celery ayarları
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_TIMEZONE = 'Europe/Istanbul'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
python manage.py migrate django_celery_beat

Artık Django admin’den /admin/django_celery_beat/periodictask/ adresine girerek görev ekleyip çıkarabilir, enable/disable edebilir, çalışma sıklığını değiştirebilirsiniz. Kod commit’i gerekmez, deploy gerekmez. Özellikle operasyonel ekiplerin sıkça değiştirdiği görev zamanlamalarında bu esneklik çok değerlidir.

Programatik olarak görev oluşturmak için:

# Django management command veya migration içinde
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json

# Önce zamanlama oluştur
schedule, created = CrontabSchedule.objects.get_or_create(
    hour=8,
    minute=30,
    day_of_week='*',
    day_of_month='*',
    month_of_year='*',
    timezone='Europe/Istanbul'
)

# Görevi oluştur
PeriodicTask.objects.get_or_create(
    name='Sabah Müşteri Raporu',
    defaults={
        'task': 'myapp.tasks.musteri_raporu_gonder',
        'crontab': schedule,
        'args': json.dumps(['haftalik']),
        'kwargs': json.dumps({'format': 'pdf'}),
        'enabled': True,
    }
)

Systemd ile Production Kurulumu

Geliştirme ortamında terminal açık tutmak işe yarar ama prodüksiyonda Beat ve Worker’ın sistem başladığında otomatik çalışması gerekir. Systemd servis dosyaları hazırlayalım:

# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker
After=network.target redis.service
Requires=redis.service

[Service]
Type=forking
User=www-data
Group=www-data
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/opt/myproject
ExecStart=/bin/sh -c '${CELERY_BIN} multi start ${CELERYD_NODES} 
  -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} 
  --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} 
  --concurrency=${CELERYD_CONCURRENCY} ${CELERYD_OPTS}'
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} 
  --pidfile=${CELERYD_PID_FILE}'
ExecReload=/bin/sh -c '${CELERY_BIN} multi restart ${CELERYD_NODES} 
  -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} 
  --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} 
  --concurrency=${CELERYD_CONCURRENCY} ${CELERYD_OPTS}'
Restart=always

[Install]
WantedBy=multi-user.target
# /etc/systemd/system/celery-beat.service
[Unit]
Description=Celery Beat Scheduler
After=network.target redis.service celery-worker.service

[Service]
Type=simple
User=www-data
Group=www-data
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/opt/myproject
ExecStart=/bin/sh -c '${CELERY_BIN} -A ${CELERY_APP} beat 
  --pidfile=${CELERYBEAT_PID_FILE} 
  --logfile=${CELERYBEAT_LOG_FILE} 
  --loglevel=${CELERYD_LOG_LEVEL} 
  --scheduler django_celery_beat.schedulers:DatabaseScheduler'
Restart=always

[Install]
WantedBy=multi-user.target
# /etc/conf.d/celery
CELERY_APP="myproject"
CELERY_BIN="/opt/myproject/venv/bin/celery"
CELERYD_NODES="worker1"
CELERYD_CONCURRENCY="4"
CELERYD_LOG_LEVEL="INFO"
CELERYD_LOG_FILE="/var/log/celery/worker.log"
CELERYD_PID_FILE="/var/run/celery/worker.pid"
CELERYBEAT_LOG_FILE="/var/log/celery/beat.log"
CELERYBEAT_PID_FILE="/var/run/celery/beat.pid"
CELERYD_OPTS="--time-limit=300 --soft-time-limit=240"
# Servisleri etkinleştir
sudo systemctl daemon-reload
sudo systemctl enable celery-worker celery-beat
sudo systemctl start celery-worker celery-beat
sudo systemctl status celery-beat

Gerçek Dünya Senaryosu: E-ticaret Platformu

Bir e-ticaret projesinde aşağıdaki gibi bir yapı kurmuştum. Birden fazla sunucuda çalışıyordu ve bu örnek Beat’in gücünü net gösteriyor:

# ecommerce/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@shared_task(bind=True, max_retries=5, default_retry_delay=120)
def stok_guncelle(self, urun_id_listesi=None):
    """Tedarikçi API'sinden stok bilgilerini çeker"""
    try:
        from .services import TedarikciAPI
        api = TedarikciAPI()
        
        if urun_id_listesi is None:
            urunler = Urun.objects.filter(aktif=True)
        else:
            urunler = Urun.objects.filter(id__in=urun_id_listesi)
        
        guncellenen = 0
        for urun in urunler:
            stok = api.stok_sorgula(urun.tedarikci_kodu)
            urun.stok_adedi = stok
            urun.save(update_fields=['stok_adedi', 'guncelleme_tarihi'])
            guncellenen += 1
        
        logger.info(f"{guncellenen} ürün stok bilgisi güncellendi")
        return {'guncellenen': guncellenen}
        
    except Exception as exc:
        logger.error(f"Stok güncelleme hatası: {exc}")
        raise self.retry(exc=exc)


@shared_task
def terk_edilen_sepet_email(saat_esigi=2):
    """Belirli saatten uzun süredir terk edilen sepetler için email gönder"""
    from datetime import datetime, timedelta
    from .services import EmailServis
    
    esik = datetime.now() - timedelta(hours=saat_esigi)
    terk_edilen = Sepet.objects.filter(
        guncelleme_tarihi__lt=esik,
        email_gonderildi=False,
        kullanici__isnull=False
    ).select_related('kullanici')
    
    gonderilen = 0
    for sepet in terk_edilen:
        try:
            EmailServis.terk_edilen_sepet_gonder(sepet)
            sepet.email_gonderildi = True
            sepet.save(update_fields=['email_gonderildi'])
            gonderilen += 1
        except Exception as e:
            logger.error(f"Sepet email hatası (sepet_id={sepet.id}): {e}")
    
    return {'gonderilen': gonderilen}
# settings.py içinde beat_schedule
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'stok-gunde-uc-kez-guncelle': {
        'task': 'ecommerce.tasks.stok_guncelle',
        'schedule': crontab(hour='8,13,18', minute=0),
        'options': {'queue': 'dusuk_oncelik', 'expires': 3600},
    },
    'terk-sepet-saatlik-kontrol': {
        'task': 'ecommerce.tasks.terk_edilen_sepet_email',
        'schedule': crontab(minute=0),
        'args': (2,),
        'options': {'queue': 'email_kuyrugu'},
    },
}

options parametresine dikkat edin: expires ile bir görev kuyrukta işlenmeden belirli süre geçerse iptal edilir. Stok güncelleme task’ı 1 saat kuyrukta beklerse artık çalışmasının anlamı yok, bir sonraki çalışma zamanını beklesin.

İzleme ve Hata Ayıklama

Beat’in çalışıp çalışmadığını anlamak için log dosyasını takip etmek en basit yol:

# Beat log'larını canlı takip et
tail -f /var/log/celery/beat.log

# Worker log'larıyla birlikte izle
tail -f /var/log/celery/beat.log /var/log/celery/worker.log

# Son çalışan görevleri Redis'ten kontrol et (Celery inspect ile)
celery -A myproject inspect scheduled
celery -A myproject inspect active
celery -A myproject inspect reserved

# Tüm kayıtlı görevleri listele
celery -A myproject inspect registered

Flower kullanıyorsanız, Beat tarafından tetiklenen görevleri de gerçek zamanlı görebilirsiniz:

pip install flower
celery -A myproject flower --port=5555 --address=127.0.0.1

Sık Yapılan Hatalar ve Çözümleri

Production’da en çok karşılaştığım sorunları sıralayayım:

Timezone karmaşası: Beat UTC’de çalışır ama siz İstanbul saatini beklersiniz. timezone = 'Europe/Istanbul' ve enable_utc = True ayarlarını mutlaka yapın. Yeterli değil, crontab tanımlarınızın hangi zaman dilimine göre çalıştığını test edin.

Beat schedule kilitleme sorunu: Birden fazla Beat instance çalıştırırsanız görevler mükerrer tetiklenir. Kubernetes ortamında özellikle bu sorun yaşanır. Deployment yerine StatefulSet veya replicas: 1 ayarlı bir Deployment kullanın.

Görev birikimi: Beat her döngüde görevi kuyruğa iter ama worker yetişemiyorsa kuyruk şişer. expires ve max_retries ayarlarını doğru yapılandırın.

PersistentScheduler ve dosya izinleri: Beat, son çalışma zamanlarını bir dosyaya kaydeder. Bu dosyanın yazılabilir olduğundan emin olun.

# Zamanlama dosyası sorunlarını çöz
mkdir -p /var/run/celery
chown www-data:www-data /var/run/celery
chmod 755 /var/run/celery

# Beat'i temiz başlatmak için (zamanlama geçmişini sıfırla)
rm -f /var/run/celery/celerybeat-schedule
systemctl restart celery-beat

Sonuç

Celery Beat, sistem cron’unun yetersiz kaldığı her senaryoda güçlü bir alternatif sunar. Görev tekrarı mekanizması, merkezi yönetim, uygulama kodu ile entegrasyon ve ölçeklenebilirlik açısından bakıldığında, orta ve büyük ölçekli projelerde cron’un yerini alması son derece mantıklı.

Bununla birlikte, her şey için Beat kullanmak gerekmez. Basit bir sunucu bakım scripti, log rotation veya sistem düzeyinde bir görev hala cron için ideal kullanım alanlarıdır. Beat’i uygulama katmanındaki, veritabanı ve broker erişimi gerektiren, retry ve monitoring önemli olan işler için tercih edin.

Prodüksiyon kurulumunda şu üç maddeye özellikle dikkat edin: Beat’i tek instance olarak çalıştırın, timezone ayarlarını test edin, ve görev birikimini önlemek için expires değerlerini mutlaka tanımlayın. Bu üçünü doğru yaparsanız gerisi genellikle kendi kendine yürür.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir