RabbitMQ Performans Optimizasyonu ve Prefetch Ayarları
Üretim ortamında RabbitMQ ile ciddi iş yükü çalıştırdıysanız, bir gün mutlaka şu soruyla yüzleşmişsinizdir: “Consumer’larım neden bu kadar yavaş?” ya da tam tersi “Broker neden bu kadar yavaş yanıt veriyor?” Çoğu zaman bu soruların cevabı prefetch ayarlarında ve birkaç temel performans parametresinde gizli. Bu yazıda bu konuyu derinlemesine ele alacağız.
Prefetch Nedir, Neden Bu Kadar Önemli?
RabbitMQ’da prefetch (QoS – Quality of Service), bir consumer’ın aynı anda kaç mesaj alabileceğini tanımlayan mekanizmadır. Bunu şöyle düşünebilirsiniz: bir restoranda garson olduğunuzu hayal edin. Mutfaktan kaç tabağı aynı anda taşıyabileceğinizi belirlemeniz gerekiyor. Çok az alırsanız verimsiz olursunuz; çok fazla alırsanız tabakları düşürebilirsiniz.
RabbitMQ’nun varsayılan davranışı, hiçbir limit koymadan tüm mesajları consumer’a push etmektir. Özellikle consumer’ınız her mesaj için ağır işlemler yapıyorsa (veritabanı sorgusu, harici API çağrısı vb.), bu durum felakete davetiye çıkarır. Consumer buffer’ı dolar, bellek tükenir ve sisteminiz çöker.
# Mevcut queue durumunu kontrol et
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged consumers
# Daha detaylı çıktı için
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged consumers memory state
Buradaki messages_unacknowledged değerine dikkat edin. Bu sayı sürekli yüksek kalıyorsa ve messages_ready sıfıra yakınsıyorsa, consumer’larınız mesajları alıyor ama işleyemiyor demektir. Prefetch probleminiz var.
Prefetch Değerini Nasıl Ayarlarsınız?
Teorik formül şöyle: prefetch = (ortalama_işlem_süresi_ms / network_round_trip_ms) * consumer_sayısı. Ama gerçek hayatta bu formül nadiren tek başına yeterli olur. Deneyimlerime dayanarak şunu söyleyebilirim: büyük çoğunlukta prefetch değerini 1’den başlatıp kademeli olarak artırmak, en güvenilir yöntemdir.
# Python pika kütüphanesi ile prefetch ayarı
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# Prefetch count ayarı - kanal bazında
channel.basic_qos(prefetch_count=10)
# Global flag ile tüm consumer'lara uygula
# channel.basic_qos(prefetch_count=10, global_qos=True)
def callback(ch, method, properties, body):
# İşlemi yap
print(f"Mesaj alındı: {body.decode()}")
# Acknowledge gönder
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='gorev_kuyrugu',
on_message_callback=callback
)
channel.start_consuming()
Java tarafında Spring AMQP kullanıyorsanız ayar biraz farklı:
// Spring AMQP ile prefetch ayarı
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// Prefetch değeri
factory.setPrefetchCount(20);
// Concurrent consumer sayısı
factory.setConcurrentConsumers(5);
factory.setMaxConcurrentConsumers(15);
// Acknowledge modu
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
Peki prefetch değeri için bir altın kural var mı? Kesin bir rakam vermek zor ama şu kılavuz ilkeler işe yarar:
- Prefetch = 1: Her consumer aynı anda yalnızca bir mesaj alır. Fair dispatch garanti edilir ama throughput düşer. CPU yoğun işlemler için iyi.
- Prefetch = 10-50: Orta yüklü sistemler için makul başlangıç noktası. I/O bound işlemler için uygun.
- Prefetch = 100+: Hızlı işlemler, düşük gecikme gerektiren senaryolar için. Mesaj boyutu küçükse ve işlem süresi çok kısaysa değerlendirilebilir.
- Prefetch = 0 (sınırsız): Neredeyse hiçbir üretim senaryosunda önerilmez.
RabbitMQ Konfigürasyonu: Temel Performans Parametreleri
Sadece prefetch yeterli değil. Broker tarafındaki konfigürasyonlar da kritik öneme sahip. rabbitmq.conf dosyasını düzgün yapılandırmak, sistemin genel dayanıklılığı açısından çok önemli.
# /etc/rabbitmq/rabbitmq.conf
# Heartbeat aralığı (saniye)
heartbeat = 60
# Bağlantı başına kanal limiti
channel_max = 2047
# Mesaj kalıcılığı için disk watermark
disk_free_limit.absolute = 2GB
# Bellek watermark - toplam RAM'in yüzdesi
vm_memory_high_watermark.relative = 0.6
# VM memory watermark paging
vm_memory_high_watermark_paging_ratio = 0.5
# Bağlantı zaman aşımı
handshake_timeout = 10000
# Yönetim arayüzü için istatistik toplama aralığı (ms)
management.rates_mode = basic
collect_statistics_interval = 5000
Bu parametrelerin bir kısmı sıklıkla göz ardı edilir. Özellikle vm_memory_high_watermark çok önemlidir. Varsayılan değer 0.4’tür, yani toplam RAM’in %40’ı dolduğunda RabbitMQ akışı durdurmaya başlar (flow control). 16GB RAM’li bir sunucuda bu 6.4GB demektir. Eğer heavy consumer’larınız varsa bu değeri 0.6-0.7’ye çıkarmak mantıklı olabilir; ama dikkatli olun, OS için bellek bırakmak zorundasınız.
Lazy Queue vs Classic Queue: Hangisi Ne Zaman?
RabbitMQ 3.6+ ile gelen lazy queue özelliği, mesajları doğrudan diske yazar, bellekte tutmaz. Büyük miktarda mesaj birikme olasılığı olan sistemlerde bu yaklaşım hayat kurtarır.
# Lazy queue oluşturma
rabbitmqadmin declare queue name=buyuk_yuk_kuyrugu
durable=true
arguments='{"x-queue-mode":"lazy"}'
# Mevcut queue'yu lazy moda geçirme (rabbitmqctl ile)
rabbitmqctl set_policy LazyQueue
"^buyuk_yuk."
'{"queue-mode":"lazy"}'
--apply-to queues
# Python ile lazy queue oluşturma
channel.queue_declare(
queue='buyuk_yuk_kuyrugu',
durable=True,
arguments={
'x-queue-mode': 'lazy',
'x-message-ttl': 86400000, # 24 saat TTL
'x-max-length': 1000000 # Maksimum mesaj sayısı
}
)
Lazy queue kullanmanız gereken durumlar:
- Consumer’lar üretim hızının arkasında kalıyorsa ve kuyruk büyüyorsa
- Spike trafiği bekliyorsanız (kampanya, toplu işlem vb.)
- Mesaj kaybını göze alamıyorsanız ve persist etmek istiyorsanız
- Bellek kullanımını minimize etmek istiyorsanız
Classic queue tercih edeceğiniz durumlar:
- Düşük gecikme kritikse
- Mesaj sayısı az ve işlem hızlıysa
- Geçici mesajlarla çalışıyorsanız
Gerçek Dünya Senaryosu: E-Ticaret Sipariş İşleme
Bir e-ticaret projesi düşünün. Black Friday günü sipariş kuyruğuna dakikada 50.000 mesaj geliyor. Her mesaj; stok kontrolü, ödeme doğrulama ve kargo entegrasyonu adımlarından geçiyor. Ortalama işlem süresi 800ms.
Başlangıçta prefetch=0 ile çalışıyorlardı. Sistem ilk bir saatte çöktü. Consumer’lar bellek bitene kadar mesaj aldı ama işleyemedi.
Çözüm olarak şu konfigürasyon uygulandı:
import pika
import time
from concurrent.futures import ThreadPoolExecutor
def siparis_isle(ch, method, properties, body):
try:
# Sipariş işleme mantığı
siparis_id = body.decode()
# Stok kontrolü (ortalama 200ms)
stok_kontrol(siparis_id)
# Ödeme doğrulama (ortalama 400ms)
odeme_dogrula(siparis_id)
# Kargo entegrasyonu (ortalama 200ms)
kargo_baslat(siparis_id)
# Başarılı işlemde ack
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Hata durumunda negative ack - requeue=True
print(f"Hata: {e}")
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
def consumer_baslat(consumer_no):
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='rabbitmq-cluster',
heartbeat=600,
blocked_connection_timeout=300,
connection_attempts=5,
retry_delay=5
)
)
channel = connection.channel()
# 10 consumer için prefetch=5, toplam 50 eşzamanlı mesaj
channel.basic_qos(prefetch_count=5)
channel.basic_consume(
queue='siparis_kuyrugu',
on_message_callback=siparis_isle
)
print(f"Consumer {consumer_no} başladı")
channel.start_consuming()
# 10 paralel consumer
with ThreadPoolExecutor(max_workers=10) as executor:
for i in range(10):
executor.submit(consumer_baslat, i)
Bu konfigürasyonla (10 consumer, her biri prefetch=5) toplam 50 mesaj eş zamanlı işlendi. Sistem stabilize oldu, bellek kullanımı öngörülebilir hale geldi.
Cluster Ortamında Performans Optimizasyonu
Tek node RabbitMQ ile cluster arasındaki fark sadece yüksek erişilebilirlik değil, aynı zamanda performanstır. Ama yanlış yapılandırılmış bir cluster, tek node’dan daha kötü performans verebilir.
# Cluster durumu kontrolü
rabbitmqctl cluster_status
# Node'lar arası senkronizasyon durumu
rabbitmqctl list_queues name slave_pids synchronised_slave_pids
# Mirror policy tanımlama - kritik queue'lar için
rabbitmqctl set_policy ha-kritik
"^kritik."
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
--priority 1
--apply-to queues
# Tüm queue'lar için ha policy (dikkatli kullanın - performans etkisi var)
rabbitmqctl set_policy ha-all
"^"
'{"ha-mode":"all","ha-sync-mode":"automatic"}'
Mirror queue’lar performansı etkiler çünkü her mesaj tüm mirror’lara yazılır. Eğer kritik değilse her queue’yu mirror’lamak gerekmez. “Exactly 2” politikası genellikle iyi bir denge noktasıdır: bir master ve bir mirror.
Dead Letter Queue ile Hata Yönetimi ve Performans
Dead letter queue (DLQ) sadece hata yönetimi için değil, sistem sağlığı açısından da önemli bir performans aracıdır. Sürekli başarısız olan mesajların ana kuyruğu bloke etmesini önler.
# Ana kuyruk DLQ ile birlikte oluşturma
channel.exchange_declare(
exchange='dlx_exchange',
exchange_type='direct',
durable=True
)
channel.queue_declare(
queue='dead_letter_queue',
durable=True
)
channel.queue_bind(
exchange='dlx_exchange',
queue='dead_letter_queue',
routing_key='dead_letter'
)
# Ana kuyruk - DLQ referansı ile
channel.queue_declare(
queue='islem_kuyrugu',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx_exchange',
'x-dead-letter-routing-key': 'dead_letter',
'x-message-ttl': 30000, # 30 saniye TTL
'x-max-retries': 3 # Maksimum retry sayısı
}
)
# DLQ'daki mesajları periyodik olarak kontrol et
rabbitmqadmin get queue=dead_letter_queue count=10
# DLQ istatistiklerini izle
rabbitmqctl list_queues name messages consumers
| grep dead_letter
DLQ’yu düzgün yapılandırmak, “zehirli mesaj” (poison message) sorununu ortadan kaldırır. Bir mesaj işlenemiyorsa sonsuza kadar retry edilmek yerine DLQ’ya gider ve ana kuyruk temiz kalır.
Monitoring ve Alert: Performans Sorunlarını Erken Yakalamak
Prometheus ve Grafana ile RabbitMQ’yu izlemek günümüzde standart olmuş durumda. rabbitmq_prometheus eklentisi ile metrikleri kolayca toplayabilirsiniz.
# Prometheus eklentisini etkinleştir
rabbitmq-plugins enable rabbitmq_prometheus
# Metriklere erişim
curl http://localhost:15692/metrics | grep rabbitmq_queue
# Önemli metrikler:
# rabbitmq_queue_messages_ready - İşlenmeyi bekleyen mesajlar
# rabbitmq_queue_messages_unacked - Unacknowledged mesajlar
# rabbitmq_queue_consumer_utilisation - Consumer kullanım oranı
# rabbitmq_queue_messages_published_total - Toplam yayınlanan
Alert kuralları için şu eşik değerleri başlangıç noktası olarak işe yarar:
- messages_unacked / consumer_count > 100: Her consumer’da 100’den fazla unacked mesaj var, prefetch çok yüksek olabilir.
- consumer_utilisation < 0.5: Consumer’lar zamanlarının %50’sinden azını mesaj işleyerek geçiriyor, prefetch çok düşük veya consumer sayısı fazla.
- messages_ready > 10000 ve büyüme eğilimi var: Consumer’lar yetişemiyor, scale gerekiyor.
Bağlantı Havuzu ve Channel Yönetimi
Sıkça yapılan bir hata: her mesaj gönderiminde yeni connection açmak. Bu hem yavaş hem de RabbitMQ için yorucu.
# Bağlantı havuzu örneği - pika ile
import pika
from queue import Queue
import threading
class RabbitMQPool:
def __init__(self, host, pool_size=5):
self.host = host
self.pool_size = pool_size
self._pool = Queue(maxsize=pool_size)
self._lock = threading.Lock()
self._init_pool()
def _init_pool(self):
for _ in range(self.pool_size):
conn = self._create_connection()
self._pool.put(conn)
def _create_connection(self):
return pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
heartbeat=600,
blocked_connection_timeout=300
)
)
def get_channel(self):
connection = self._pool.get()
if not connection.is_open:
connection = self._create_connection()
channel = connection.channel()
return channel, connection
def return_connection(self, connection):
if connection.is_open:
self._pool.put(connection)
# Kullanım
pool = RabbitMQPool(host='localhost', pool_size=10)
def mesaj_gonder(mesaj):
channel, conn = pool.get_channel()
try:
channel.basic_publish(
exchange='',
routing_key='hedef_kuyruk',
body=mesaj,
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json'
)
)
finally:
channel.close()
pool.return_connection(conn)
Her channel için ayrı connection açmak yerine, connection’ları havuzda tutup channel’ları bu connection’lar üzerinden açmak hem kaynak tasarrufu sağlar hem de bağlantı kurulum süresini ortadan kaldırır.
Sonuç
RabbitMQ performans optimizasyonu aslında birkaç temel prensibin üzerine kuruludur: prefetch değerini sistemin işlem hızına göre ayarlamak, bellek ve disk watermark’larını gerçekçi biçimde yapılandırmak, dead letter mekanizmasıyla zehirli mesajları izole etmek ve her şeyi ölçümleyip izlemek.
Prefetch için altın kural yoktur; işlem sürenize, consumer sayınıza ve mesaj boyutuna göre değişir. Ancak şunu güvenle söyleyebilirim: prefetch=0 ile üretime çıkmayın. 1’den başlayın, benchmark alın, artırın, tekrar benchmark alın. Sisteminiz size doğru değeri söyler.
Cluster ortamında mirror policy’yi körü körüne tüm queue’lara uygulamaktan kaçının. Kritik olanları belirleyin ve sadece onları mirror’layın. Her şeyi mirror’lamak hem disk hem de ağ bant genişliğini tüketir.
Son olarak: RabbitMQ Management UI güzel ama yeterli değil. Prometheus metriklerini alın, Grafana ile görselleştirin ve anlamlı alertler kurun. Bir sorun üretimde patlak vermeden önce size bağırmaya başlayacak kadar erken fark etmek mümkün; tek şart sisteminizi düzgün izliyor olmak.
