RabbitMQ ile Python Producer ve Consumer Yazımı
Mesaj kuyruklarıyla ilk kez çalışmaya başladığımda, bir REST API üzerinden senkron çağrılar yaparak “async” iş yaptığımı sanıyordum. Kulağa tanıdık geliyor mu? Sonra bir gün üretim ortamında bir servis 30 saniye boyunca yanıt vermedi ve arkasında bekleyen 400 istek birbirini domine etmeye başladı. O gün RabbitMQ’yu ciddiye almaya karar verdim. Bu yazıda Python ile gerçek bir producer-consumer mimarisi kuracağız, teoride değil pratikte ne işe yaradığını göreceğiz.
RabbitMQ Nedir, Neden Kullanıyoruz
RabbitMQ, AMQP (Advanced Message Queuing Protocol) protokolünü uygulayan açık kaynaklı bir mesaj broker’ıdır. Ama bu teknik tanımı bir kenara bırakalım. Pratik olarak şunu yapıyor: bir servisin “şimdi yapılamayan” ya da “başka bir servisin yapması gereken” işleri bir kuyruğa koymasını sağlıyor, diğer taraf da kendi hızında bu işleri tüketiyor.
Hangi durumlarda buna ihtiyaç duyarsınız?
- E-posta veya SMS göndermek gibi gecikmeye toleranslı işlemlerde
- Büyük dosya işleme, video dönüştürme gibi uzun süren görevlerde
- Mikroservis mimarisinde servisler arası haberleşmede
- Rate limiting gerektiren üçüncü parti API entegrasyonlarında
- Log agregasyonu ve event streaming senaryolarında
RabbitMQ’nun Kafka gibi alternatiflerine göre en büyük avantajı kurulum ve operasyonel basitliğidir. Küçük ve orta ölçekli sistemler için Kafka’nın getirdiği karmaşıklığa gerek yoktur çoğu zaman.
Kurulum ve Hazırlık
Docker ile hızlıca başlayalım. Üretim ortamı için ayrı bir bölümde konuşacağız ama geliştirme ortamı için Docker en pratik yol:
docker run -d
--name rabbitmq
-p 5672:5672
-p 15672:15672
-e RABBITMQ_DEFAULT_USER=admin
-e RABBITMQ_DEFAULT_PASS=gizli_sifre
rabbitmq:3.12-management
Port 5672 AMQP bağlantıları için, 15672 ise yönetim arayüzü için. Tarayıcıdan http://localhost:15672 adresine girdiğinizde kullanıcı adı ve şifrenizi girerek kuyrukları, exchange’leri ve mesaj akışını izleyebilirsiniz. Bu arayüzü açık tutmanızı şiddetle tavsiye ederim, debug süreçlerinde hayat kurtarıyor.
Python tarafında ihtiyacımız olan kütüphane pika. RabbitMQ’nun resmi Python istemcisi sayılır ve AMQP 0-9-1 protokolünü tam destekler:
pip install pika
Bağlantı testini basitçe yapabiliriz:
python3 -c "
import pika
conn = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, '/',
pika.PlainCredentials('admin', 'gizli_sifre'))
)
print('Bağlantı başarılı:', conn.is_open)
conn.close()
"
Temel Kavramları Anlamak
Kod yazmadan önce üç kavramı netleştirmek gerekiyor çünkü bunları atladığınızda yazdığınız kod çalışıyor ama neden çalıştığını bilmiyorsunuz, sorun çıktığında da nereye bakacağınızı bilemiyorsunuz.
Exchange: Producer mesajları doğrudan kuyruğa göndermez. Exchange’e gönderir. Exchange de routing kurallarına göre mesajı ilgili kuyruğa iletir. Dört tipi var: direct, fanout, topic, headers.
Queue: Mesajların bekletildiği yapı. Consumer buradan okur.
Binding: Exchange ile queue arasındaki bağlantı. Routing key bu bağlantıyı yönetir.
Routing Key: Mesajın hangi kuyruğa gideceğini belirleyen string değer.
En basit senaryoda default exchange kullanılır ve routing key doğrudan kuyruk adı olarak çalışır. Bu yüzden başlangıç örneklerinde exchange’den bahsedilmez, ama üretim ortamında muhtemelen custom exchange kullanacaksınız.
İlk Producer: Mesaj Göndermek
Bir e-ticaret sisteminde sipariş oluşturulduğunda bildirim gönderme senaryosunu düşünelim. Sipariş servisi mesajı kuyruğa koyar, bildirim servisi kuyruğu tüketir:
# producer.py
import pika
import json
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_connection():
credentials = pika.PlainCredentials('admin', 'gizli_sifre')
parameters = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
return pika.BlockingConnection(parameters)
def publish_order_notification(order_data: dict):
connection = None
try:
connection = get_connection()
channel = connection.channel()
# Kuyruk yoksa oluştur, varsa mevcut olanı kullan
channel.queue_declare(
queue='order_notifications',
durable=True # RabbitMQ yeniden başlasa bile kuyruk kalsın
)
message = json.dumps({
**order_data,
'timestamp': datetime.utcnow().isoformat(),
'version': '1.0'
})
channel.basic_publish(
exchange='', # Default exchange
routing_key='order_notifications',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Mesajı diske yaz (persistent)
content_type='application/json',
message_id=f"order_{order_data.get('order_id')}_{int(datetime.utcnow().timestamp())}"
)
)
logger.info(f"Mesaj gönderildi: order_id={order_data.get('order_id')}")
except pika.exceptions.AMQPConnectionError as e:
logger.error(f"RabbitMQ bağlantı hatası: {e}")
raise
finally:
if connection and connection.is_open:
connection.close()
if __name__ == '__main__':
for i in range(1, 6):
order = {
'order_id': f'ORD-{1000 + i}',
'user_id': f'USR-{200 + i}',
'product': 'Laptop',
'amount': 15000 + (i * 500),
'status': 'created'
}
publish_order_notification(order)
Dikkat etmeniz gereken iki kritik nokta var. Birincisi durable=True parametresi: kuyruk tanımında bunu geçmezseniz RabbitMQ yeniden başladığında kuyruğunuz uçar. İkincisi delivery_mode=2: mesajın diske yazılmasını sağlar, yoksa broker çöktüğünde mesaj kaybolur. Her ikisini de birlikte kullanmanız gerekiyor, biri olmadan diğeri yetersiz kalır.
İlk Consumer: Mesaj Almak
# consumer.py
import pika
import json
import logging
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_order_notification(order_data: dict):
"""Gerçek iş burada yapılır: e-posta gönder, SMS at, vb."""
logger.info(f"İşleniyor: order_id={order_data.get('order_id')}, "
f"amount={order_data.get('amount')} TL")
# Simülasyon: gerçekte burada e-posta servisi çağrılır
time.sleep(0.5)
logger.info(f"Tamamlandı: order_id={order_data.get('order_id')}")
def on_message(channel, method, properties, body):
try:
order_data = json.loads(body)
process_order_notification(order_data)
# İşlem başarılıysa acknowledge gönder
channel.basic_ack(delivery_tag=method.delivery_tag)
except json.JSONDecodeError as e:
logger.error(f"JSON parse hatası: {e}, mesaj reddediliyor")
# Bozuk mesajı kuyruya geri koyma (requeue=False)
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
logger.error(f"İşlem hatası: {e}, mesaj yeniden kuyruğa alınıyor")
# Geçici hata, tekrar dene
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def start_consumer():
credentials = pika.PlainCredentials('admin', 'gizli_sifre')
parameters = pika.ConnectionParameters(
host='localhost',
port=5672,
credentials=credentials,
heartbeat=600
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='order_notifications', durable=True)
# Aynı anda kaç mesaj alacağını belirle
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='order_notifications',
on_message_callback=on_message,
auto_ack=False # Manuel acknowledge kullan
)
logger.info("Consumer başlatıldı, mesaj bekleniyor...")
channel.start_consuming()
if __name__ == '__main__':
start_consumer()
auto_ack=False ve manuel basic_ack kullanımı kritik. Otomatik acknowledge açık olduğunda RabbitMQ mesajı consumer’a teslim ettiği anda siler. Consumer işlem sırasında çökerse mesaj gider. Manuel acknowledge ile siz “işledim, artık silebilirsin” diyene kadar RabbitMQ mesajı tutar.
prefetch_count=1 parametresi de önemli: consumer bir mesajı acknowledge etmeden yeni mesaj almaz. Birden fazla consumer çalıştırıyorsanız bu yük dengelemeyi adil hale getirir.
Exchange Tipleri ve Gerçek Kullanım Senaryosu
Direct exchange ile tek kuyruğa mesaj gönderdik. Ama gerçek dünyada genellikle daha karmaşık routing ihtiyaçları olur. Bir log sistemi düşünelim: aynı mesajın hem veritabanına yazılması hem de kritik olanların Slack’e gitmesi gerekiyor.
Bunun için topic exchange idealdir:
# topic_exchange_producer.py
import pika
import json
from datetime import datetime
def setup_exchange_and_queues(channel):
# Exchange oluştur
channel.exchange_declare(
exchange='app_logs',
exchange_type='topic',
durable=True
)
# Kuyrukları oluştur
channel.queue_declare(queue='all_logs', durable=True)
channel.queue_declare(queue='critical_logs', durable=True)
# Binding kuralları
# all_logs: tüm seviyeleri al
channel.queue_bind(
exchange='app_logs',
queue='all_logs',
routing_key='log.*'
)
# critical_logs: sadece error ve critical al
channel.queue_bind(
exchange='app_logs',
queue='critical_logs',
routing_key='log.error'
)
channel.queue_bind(
exchange='app_logs',
queue='critical_logs',
routing_key='log.critical'
)
def publish_log(channel, level: str, message: str, service: str):
routing_key = f'log.{level}'
payload = json.dumps({
'level': level,
'message': message,
'service': service,
'timestamp': datetime.utcnow().isoformat()
})
channel.basic_publish(
exchange='app_logs',
routing_key=routing_key,
body=payload,
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"[{level.upper()}] {service}: {message}")
if __name__ == '__main__':
conn = pika.BlockingConnection(
pika.ConnectionParameters('localhost',
credentials=pika.PlainCredentials('admin', 'gizli_sifre'))
)
ch = conn.channel()
setup_exchange_and_queues(ch)
publish_log(ch, 'info', 'Uygulama başlatıldı', 'order-service')
publish_log(ch, 'warning', 'Yavaş sorgu tespit edildi', 'db-service')
publish_log(ch, 'error', 'Ödeme servisi yanıt vermiyor', 'payment-service')
publish_log(ch, 'critical', 'Veritabanı bağlantısı koptu', 'core-service')
conn.close()
Bu örnekte log.info ve log.warning sadece all_logs kuyruğuna gider. log.error ve log.critical ise hem all_logs hem de critical_logs kuyruklarına düşer. critical_logs‘u tüketen consumer Slack’e bildirim gönderebilir, all_logs‘u tüketen consumer Elasticsearch’e yazabilir.
Bağlantı Yönetimi ve Retry Mekanizması
Üretim ortamında bağlantı kopmaları kaçınılmaz. Özellikle container ortamlarında RabbitMQ yeniden başlayabilir, ağ anlık olarak kesilebilir. Bunları yönetmek için robust bir consumer yazmak gerekiyor:
# robust_consumer.py
import pika
import json
import logging
import time
from typing import Callable
logger = logging.getLogger(__name__)
class RobustConsumer:
def __init__(self, host: str, port: int, username: str, password: str):
self.host = host
self.port = port
self.credentials = pika.PlainCredentials(username, password)
self.connection = None
self.channel = None
self.max_retries = 5
self.retry_delay = 5 # saniye
def connect(self):
retries = 0
while retries < self.max_retries:
try:
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
credentials=self.credentials,
heartbeat=600,
blocked_connection_timeout=300
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
logger.info("RabbitMQ bağlantısı kuruldu")
return True
except pika.exceptions.AMQPConnectionError as e:
retries += 1
wait_time = self.retry_delay * retries
logger.warning(f"Bağlantı denemesi {retries}/{self.max_retries} başarısız. "
f"{wait_time}s sonra tekrar deneniyor. Hata: {e}")
time.sleep(wait_time)
logger.error("Maksimum yeniden deneme sayısına ulaşıldı")
return False
def consume(self, queue: str, callback: Callable, prefetch_count: int = 1):
if not self.connect():
raise RuntimeError("RabbitMQ'ya bağlanılamadı")
self.channel.queue_declare(queue=queue, durable=True)
self.channel.basic_qos(prefetch_count=prefetch_count)
self.channel.basic_consume(
queue=queue,
on_message_callback=callback,
auto_ack=False
)
while True:
try:
logger.info(f"'{queue}' kuyruğu dinleniyor...")
self.channel.start_consuming()
except pika.exceptions.ConnectionClosedByBroker:
logger.warning("Broker bağlantıyı kapattı, yeniden bağlanılıyor...")
if not self.connect():
break
self.channel.basic_consume(
queue=queue,
on_message_callback=callback,
auto_ack=False
)
except pika.exceptions.AMQPChannelError as e:
logger.error(f"Kanal hatası (kritik): {e}")
break
except KeyboardInterrupt:
logger.info("Consumer durduruluyor...")
self.channel.stop_consuming()
break
if self.connection and self.connection.is_open:
self.connection.close()
Dead Letter Queue: Başarısız Mesajları Yönetmek
Gerçek dünyada bazı mesajlar işlenemez. Bozuk format, eksik veri, üçüncü parti servis hatası… Bu mesajları sonsuza kadar yeniden denemek yerine bir “ölü mesaj kuyruğuna” (Dead Letter Queue) almak çok daha sağlıklı:
# dlq_setup.py
import pika
def setup_dlq_architecture():
conn = pika.BlockingConnection(
pika.ConnectionParameters('localhost',
credentials=pika.PlainCredentials('admin', 'gizli_sifre'))
)
ch = conn.channel()
# Önce dead letter exchange ve kuyruğu oluştur
ch.exchange_declare(exchange='dlx_exchange', exchange_type='direct', durable=True)
ch.queue_declare(queue='dead_letter_queue', durable=True)
ch.queue_bind(
exchange='dlx_exchange',
queue='dead_letter_queue',
routing_key='dead'
)
# Ana kuyruğu DLQ ayarlarıyla oluştur
ch.queue_declare(
queue='payment_processing',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx_exchange',
'x-dead-letter-routing-key': 'dead',
'x-message-ttl': 300000, # 5 dakika sonra süresi dolan mesajlar da DLQ'ya gider
'x-max-retries': 3
}
)
print("DLQ mimarisi kuruldu")
print("- Ana kuyruk: payment_processing")
print("- Ölü mesaj kuyruğu: dead_letter_queue")
conn.close()
if __name__ == '__main__':
setup_dlq_architecture()
DLQ’daki mesajları periyodik olarak inceleyip manuel müdahale gerektirenleri yeniden işleyebilirsiniz. Bu, kayıp mesaj sorununu tamamen ortadan kaldırır.
Üretim Ortamı İçin Pratik Notlar
Birkaç yıllık RabbitMQ deneyiminden süzülmüş pratik notlar:
- Virtual Host kullanın: Farklı uygulamalar için ayrı vhost oluşturun. İzolasyon sağlar, yönetimi kolaylaştırır. Default
/vhost’unu üretimde kullanmayın. - Bağlantı havuzu: Her mesaj için yeni bağlantı açmayın. Uygulama başlangıcında bir bağlantı açın, kanalları yeniden kullanın. Her AMQP bağlantısı TCP bağlantısıdır.
- Monitoring: RabbitMQ’nun Prometheus exporter’ı var,
rabbitmq_prometheusplugin’ini aktif edin. Kuyruk derinliği, mesaj hızı, unacked mesaj sayısı gibi metrikleri mutlaka izleyin. - Mesaj boyutu: RabbitMQ büyük mesajlar için optimize edilmemiştir. Büyük payload’lar için mesajı S3/MinIO’ya koyun, kuyruğa sadece referans gönderin.
- Cluster kurulumu: Üretimde tek node RabbitMQ çalıştırmayın. En az 3 node’lu bir cluster ve quorum queue kullanın. Mirrored queue eski yaklaşım, artık quorum queue tercih edilmeli.
# RabbitMQ yönetim komutları
# Kuyruk listesi
rabbitmqctl list_queues name messages consumers
# Kuyruk bilgisi
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
# Bağlantı listesi
rabbitmqctl list_connections user state
# Kanal listesi
rabbitmqctl list_channels
# Exchange listesi
rabbitmqctl list_exchanges name type durable
# Vhost oluşturma
rabbitmqctl add_vhost production
rabbitmqctl set_permissions -p production admin ".*" ".*" ".*"
Sonuç
RabbitMQ ile Python producer-consumer yazmak ilk bakışta basit görünüyor, ama detaylarda gizli tehlikeler var. durable=True ve delivery_mode=2 ikilisini unutmak, auto_ack=True ile çalışmak, bağlantı hatalarını yönetmemek, DLQ kurmamak… Bunların her biri gerçek veri kaybına yol açar.
Bu yazıda anlattıklarımı özetlersek: mesaj kalıcılığı için hem kuyruk hem mesaj seviyesinde dayanıklılığı etkinleştirin, manuel acknowledge kullanın, bağlantı kopmaları için retry mekanizması yazın, başarısız mesajlar için dead letter queue kurun ve topic exchange ile routing mantığını ihtiyacınıza göre modelleyin.
Geliştirme ortamında RabbitMQ Management UI’ı açık tutun ve mesaj akışını gözlemleyin. Kuyruğun dolduğunu, consumer’ların tıkandığını veya ack bekleyen mesajların biriktiğini bu arayüzden görürsünüz. Üretim ortamında da aynı metrikleri Prometheus/Grafana ile izlemek, olası sorunları önceden yakalamanızı sağlar.
Bir sonraki adım olarak Celery’yi incelemenizi öneririm. Celery arka planda RabbitMQ kullanır ve görev yönetimini çok daha yüksek seviyede soyutlar. Ancak Celery kullanmadan önce altta ne döndüğünü anlamak, sorun çıktığında büyük avantaj sağlar. Bu yazı tam olarak o temeli vermeyi amaçlıyordu.
