RabbitMQ Exchange Türleri: Direct, Fanout, Topic ve Headers
Mesaj kuyruğu sistemleriyle ilk kez ciddi anlamda boğuştuğumda, RabbitMQ’nun exchange kavramını anlamamın üç günümü aldığını itiraf etmeliyim. Dokümantasyona bakıyorsun, “mesaj publisher’dan consumer’a gider” deniyor, tamam, ama aradaki routing mantığı nerede? İşte o “arada” olan şey exchange’ler. Ve bu exchange’lerin dört farklı türü olduğunu öğrenince, hangisini ne zaman kullanacağını bilmek gerçekten kritik hale geliyor.
Bu yazıda RabbitMQ’nun dört temel exchange türünü; Direct, Fanout, Topic ve Headers’ı gerçek dünya senaryoları üzerinden inceleyeceğiz. Soyut tanımlardan çok, “şu iş yerinde şu sorunu böyle çözdük” perspektifinden bakacağız.
Exchange Nedir ve Neden Önemlidir?
RabbitMQ’da bir mesaj gönderdiğinizde, o mesaj doğrudan bir kuyruğa gitmez. Publisher mesajı bir exchange’e gönderir, exchange de binding kurallarına göre mesajı bir ya da birden fazla kuyruğa yönlendirir. Bu ayrım çok önemli: publisher hangi kuyruğa yazıldığını bilmek zorunda değildir.
Exchange’in routing kararını verirken kullandığı iki temel girdi vardır:
- Routing key: Publisher’ın mesajla birlikte gönderdiği bir string etiket
- Binding: Exchange ile kuyruk arasında tanımlanan ilişki ve bu ilişkideki kurallar
Bu yapının bize sağladığı en büyük fayda, publisher ile consumer arasındaki gevşek bağlılıktır (loose coupling). Yarın sisteminize yeni bir consumer eklemeniz gerektiğinde publisher koduna dokunmanıza gerek kalmaz.
RabbitMQ’da exchange oluştururken belirtmeniz gereken temel parametreler:
- Type: direct, fanout, topic, headers
- Durable: Broker yeniden başladığında exchange hayatta kalır mı?
- Auto-delete: Son binding silindiğinde exchange otomatik silinsin mi?
- Internal: Sadece diğer exchange’lerden mesaj alabilir, publisher doğrudan yazamaz
Şimdi her türü tek tek inceleyelim.
Direct Exchange
En basit ve en sık kullanılan türdür. Mantığı şudur: mesajın routing key’i ile binding’in routing key’i birebir eşleşiyorsa, mesaj o kuyruğa gider.
Gerçek dünya örneği olarak bir e-ticaret sistemini düşünelim. Sipariş durumu değiştiğinde farklı servisler farklı aksiyonlar alır: ödeme servisi onay bekler, kargo servisi “hazırlanıyor” durumunu bekler, bildirim servisi her değişikliği müşteriye iletir. Ama her servisin her mesajı alması anlamsız. İşte burada direct exchange devreye girer.
# RabbitMQ management CLI ile direct exchange oluşturma
rabbitmqadmin declare exchange name=order.events type=direct durable=true
# Kuyrukları oluştur
rabbitmqadmin declare queue name=payment.queue durable=true
rabbitmqadmin declare queue name=shipping.queue durable=true
rabbitmqadmin declare queue name=notification.queue durable=true
# Binding'leri tanımla
rabbitmqadmin declare binding source=order.events
destination=payment.queue routing_key=order.payment_pending
rabbitmqadmin declare binding source=order.events
destination=shipping.queue routing_key=order.preparing
rabbitmqadmin declare binding source=order.events
destination=notification.queue routing_key=order.payment_pending
rabbitmqadmin declare binding source=order.events
destination=notification.queue routing_key=order.preparing
rabbitmqadmin declare binding source=order.events
destination=notification.queue routing_key=order.shipped
Python ile basit bir publisher örneği:
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Siparişin ödeme beklediğini bildiren mesaj
channel.basic_publish(
exchange='order.events',
routing_key='order.payment_pending',
body='{"order_id": "12345", "amount": 299.90}',
properties=pika.BasicProperties(
delivery_mode=2, # Kalıcı mesaj
content_type='application/json'
)
)
print("Mesaj gönderildi: order.payment_pending")
connection.close()
Bu örnekte order.payment_pending routing key’iyle gönderilen mesaj hem payment.queue hem de notification.queue kuyruğuna gider çünkü her ikisi de bu routing key ile binding tanımlamış. shipping.queue ise bu mesajı almaz.
Direct exchange’in güzel yanı öngörülebilirliğidir. Hangi mesajın nereye gideceğini tam olarak bilirsiniz. Dezavantajı ise esneklik eksikliğidir; yeni bir routing pattern eklemek istediğinizde hem exchange’e hem de kuyruklara binding eklemeniz gerekir.
Default Exchange konusunu da atlamamak gerekir: RabbitMQ’da her kuyruk oluşturulduğunda, o kuyruğun adıyla aynı routing key’e sahip bir binding otomatik olarak varsayılan (default) exchange’e eklenir. Bu sayede herhangi bir exchange belirtmeden, sadece kuyruk adını routing key olarak kullanarak mesaj gönderebilirsiniz. Basit uygulamalar için kullanışlı ama production’da genellikle kaçınılması önerilir.
Fanout Exchange
Fanout exchange routing key’i tamamen yok sayar. Aldığı her mesajı, kendine bağlı tüm kuyruklara kopyalayarak dağıtır. Broadcast mantığı.
Ne zaman kullanırsınız? Bir olayı sistemdeki tüm ilgili servislerin aynı anda bilmesi gerektiğinde. Örneğin bir kullanıcı sistemden silindiğinde; oturum yönetimi servisi aktif session’ları kapatmalı, profil servisi verileri silmeli, e-posta servisi hesap silme onayı göndermeli, audit log servisi bu aksiyonu kaydetmeli. Bunların hepsine aynı mesajı iletmeniz gerekiyor.
# Fanout exchange oluştur
rabbitmqadmin declare exchange name=user.deleted type=fanout durable=true
# Her servis için kuyruk
rabbitmqadmin declare queue name=session.cleanup.queue durable=true
rabbitmqadmin declare queue name=profile.deletion.queue durable=true
rabbitmqadmin declare queue name=email.notification.queue durable=true
rabbitmqadmin declare queue name=audit.log.queue durable=true
# Binding'lerde routing_key belirtmeye gerek yok (yok sayılır)
rabbitmqadmin declare binding source=user.deleted
destination=session.cleanup.queue
rabbitmqadmin declare binding source=user.deleted
destination=profile.deletion.queue
rabbitmqadmin declare binding source=user.deleted
destination=email.notification.queue
rabbitmqadmin declare binding source=user.deleted
destination=audit.log.queue
import pika
import json
def publish_user_deleted_event(user_id: str):
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
message = json.dumps({
"user_id": user_id,
"timestamp": "2024-01-15T10:30:00Z",
"reason": "user_requested"
})
# Routing key belirtmiyoruz, fanout zaten dikkate almaz
channel.basic_publish(
exchange='user.deleted',
routing_key='', # Boş bırakılabilir
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json'
)
)
print(f"Kullanıcı silme eventi yayınlandı: {user_id}")
connection.close()
publish_user_deleted_event("user-789")
Fanout exchange’in operasyonel açıdan dikkat edilmesi gereken bir noktası var: Yeni bir servis sistemin fanout exchange’ine bağlandığı anda, o andan itibaren gelen tüm mesajları almaya başlar. Bu çoğu zaman istenen davranıştır, ama geçmiş mesajları almak istiyorsanız fanout tek başına yeterli değildir.
Topic Exchange
Topic exchange, direct ve fanout arasında bir denge noktasıdır. Routing key’leri nokta (.) ile ayrılmış kelimeler olarak değerlendirir ve iki özel wildcard karakter kullanır:
*(yıldız): Tam olarak bir kelimeyi karşılar#(kare): Sıfır ya da daha fazla kelimeyi karşılar
Bu yapı, karmaşık routing senaryolarını çok daha temiz bir şekilde ifade etmenizi sağlar. Ben bu exchange türünü öğrendiğimde, “neden daha önce bunu kullanmıyordum” diye düşünmüştüm.
Şöyle bir log yönetimi sistemi hayal edin. Microservice mimarisinde onlarca servis var ve her servisin farklı seviyede logları mevcut (info, warning, error, critical). Bazı log consumer’larının sadece error ve üzerini alması, bazılarının sadece belirli servislerin loglarını alması, bazılarının ise her şeyi alması gerekiyor.
# Topic exchange oluştur
rabbitmqadmin declare exchange name=application.logs type=topic durable=true
# Farklı ihtiyaçlar için kuyruklar
rabbitmqadmin declare queue name=all.logs.queue durable=true
rabbitmqadmin declare queue name=error.logs.queue durable=true
rabbitmqadmin declare queue name=payment.service.logs.queue durable=true
rabbitmqadmin declare queue name=critical.alerts.queue durable=true
# Tüm logları al: # her şeyi yakalar
rabbitmqadmin declare binding source=application.logs
destination=all.logs.queue routing_key="#"
# Sadece error ve üstü: *.error, *.critical gibi mesajları yakalar
# Ama daha pratik yaklaşım:
rabbitmqadmin declare binding source=application.logs
destination=error.logs.queue routing_key="*.error"
rabbitmqadmin declare binding source=application.logs
destination=error.logs.queue routing_key="*.critical"
# Sadece payment servisinin tüm logları
rabbitmqadmin declare binding source=application.logs
destination=payment.service.logs.queue routing_key="payment.*"
# Sadece critical alert'ler, hangi servisten olursa olsun
rabbitmqadmin declare binding source=application.logs
destination=critical.alerts.queue routing_key="*.critical"
Publisher tarafında routing key formatı servis_adi.log_seviyesi olarak tanımlanmış:
import pika
import json
from datetime import datetime
def log_to_rabbitmq(service_name: str, level: str, message: str):
"""
service_name: payment, inventory, user, order vb.
level: info, warning, error, critical
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
log_entry = json.dumps({
"service": service_name,
"level": level,
"message": message,
"timestamp": datetime.utcnow().isoformat()
})
routing_key = f"{service_name}.{level}"
channel.basic_publish(
exchange='application.logs',
routing_key=routing_key,
body=log_entry,
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json'
)
)
print(f"Log gönderildi [{routing_key}]: {message}")
connection.close()
# Örnekler
log_to_rabbitmq("payment", "error", "Ödeme gateway timeout")
# Bu mesajı alacaklar: all.logs.queue, error.logs.queue, payment.service.logs.queue
log_to_rabbitmq("inventory", "critical", "Veritabanı bağlantısı kesildi")
# Bu mesajı alacaklar: all.logs.queue, error.logs.queue, critical.alerts.queue
log_to_rabbitmq("payment", "info", "Yeni ödeme işlemi başlatıldı")
# Bu mesajı alacaklar: all.logs.queue, payment.service.logs.queue
Topic exchange’in # wildcard’ı çok güçlüdür ama dikkatli kullanılmalıdır. Bir kuyruğa # binding’i eklediğinizde, o kuyruk exchange’deki tüm mesajları almaya başlar; bu Fanout gibi davranması anlamına gelir. Farkında olmadan böyle bir binding oluşturursanız, o kuyruk beklenmedik yükle karşılaşabilir.
Headers Exchange
Headers exchange, dört tür içinde en az kullanılanı ama belki de en esnek olanıdır. Routing key’i tamamen görmezden gelir, bunun yerine mesajın başlık (header) alanlarına bakarak routing kararı verir.
Bir binding oluştururken iki özel header eklenir:
- x-match: all – Binding’deki tüm header’ların mesajda eşleşmesi gerekir (AND mantığı)
- x-match: any – Binding’deki header’lardan en az birinin eşleşmesi yeterlidir (OR mantığı)
Headers exchange ne zaman işe yarar? Routing mantığınız çok boyutlu olduğunda ve bu boyutları routing key string’ine sığdırmak zorlaştığında. Örneğin bir rapor oluşturma sistemi düşünelim: raporlar hem formatına (PDF, Excel, CSV) hem de departmana (finans, İK, operasyon) hem de önceliğine (yüksek, normal) göre farklı kuyruklara yönlendirilmeli.
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Headers exchange oluştur
channel.exchange_declare(
exchange='report.generation',
exchange_type='headers',
durable=True
)
# Kuyruklar
channel.queue_declare(queue='pdf.reports.queue', durable=True)
channel.queue_declare(queue='finance.reports.queue', durable=True)
channel.queue_declare(queue='urgent.reports.queue', durable=True)
# PDF formatındaki tüm raporlar (x-match: all ile tek koşul)
channel.queue_bind(
exchange='report.generation',
queue='pdf.reports.queue',
routing_key='',
arguments={
'x-match': 'all',
'format': 'pdf'
}
)
# Finans departmanının tüm raporları (format fark etmez)
channel.queue_bind(
exchange='report.generation',
queue='finance.reports.queue',
routing_key='',
arguments={
'x-match': 'all',
'department': 'finance'
}
)
# Acil raporlar: ya öncelik yüksek ya da finans departmanından
channel.queue_bind(
exchange='report.generation',
queue='urgent.reports.queue',
routing_key='',
arguments={
'x-match': 'any',
'priority': 'high',
'department': 'finance'
}
)
# PDF, finans departmanı için acil rapor gönder
channel.basic_publish(
exchange='report.generation',
routing_key='', # Headers exchange için önemli değil
body='{"report_id": "RPT-2024-001", "data": "..."}',
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json',
headers={
'format': 'pdf',
'department': 'finance',
'priority': 'high'
}
)
)
print("Rapor talebi gönderildi")
# Bu mesajı alacaklar: pdf.reports.queue, finance.reports.queue, urgent.reports.queue
connection.close()
Headers exchange’in dezavantajı performanstır. Her mesaj için header parsing yapılması gerektiğinden, yüksek throughput senaryolarında topic exchange’e kıyasla daha yavaş çalışır. Bu yüzden gerçekten ihtiyaç duyulmadıkça topic exchange’i tercih etmek daha mantıklıdır.
Exchange Seçerken Dikkat Edilmesi Gereken Operasyonel Konular
Production ortamında exchange yönetimi yaparken öğrendiğim bazı şeyleri paylaşmak istiyorum.
Dead Letter Exchange (DLX) entegrasyonu: Herhangi bir exchange türünü kullanırken, başarısız mesajlar için bir DLX tanımlamak iyi bir pratiktir.
# Dead letter exchange oluştur
rabbitmqadmin declare exchange name=dlx.exchange type=direct durable=true
# Dead letter kuyruğu
rabbitmqadmin declare queue name=dead.letter.queue durable=true
# Binding
rabbitmqadmin declare binding source=dlx.exchange
destination=dead.letter.queue routing_key=dead
# Ana kuyruğu DLX ile tanımla
rabbitmqadmin declare queue name=payment.queue durable=true
arguments='{"x-dead-letter-exchange": "dlx.exchange", "x-dead-letter-routing-key": "dead", "x-message-ttl": 86400000}'
Exchange silme işlemi: Bir exchange’i silmeden önce, o exchange’e bağlı publisher’ların olmadığından emin olun. Exchange silindiğinde, publisher’lar hata almaya başlar ve mesajlar kaybolabilir.
# Exchange istatistiklerini kontrol et
rabbitmqadmin list exchanges name type durable message_stats
# Belirli bir exchange'in binding'lerini listele
rabbitmqadmin list bindings source=order.events
Exchange federation: Birden fazla RabbitMQ node’u arasında mesaj taşımanız gerekiyorsa, RabbitMQ Federation Plugin’i kullanabilirsiniz. Bu özellikle coğrafi olarak dağıtık sistemlerde işe yarar.
Hangi Exchange Türünü Seçmeliyim?
Bu soruyu sıkça duyuyorum. Kısa bir rehber:
- Direct kullan: Mesajın belirli bir veya birkaç kuyruğa gitmesi gerektiğinde ve routing key’ler statik ve öngörülebilir olduğunda. Örneğin task queue senaryoları, belirli worker’lara iş dağıtımı.
- Fanout kullan: Bir eventi sistemdeki tüm ilgili tarafların bilmesi gerektiğinde. Gerçek anlamda broadcast ihtiyacınız varsa.
- Topic kullan: Routing mantığınız hiyerarşik veya çok boyutlu olduğunda ve bu boyutlar string ile ifade edilebiliyorsa. Büyük çoğunlukta bu seçim Topic veya Direct arasında olur.
- Headers kullan: Routing kararı birden fazla kriterle verilmeli ve bu kriterleri routing key string’ine sığdırmak anlamsız görünüyorsa. Ama performans kritik değilse.
Sonuç
RabbitMQ’nun exchange sistemi, ilk bakışta karmaşık görünse de üzerine düşündükçe ne kadar zarif tasarlandığını fark ediyorsunuz. Her exchange türü belirli bir problemi çözmek için var; doğru türü seçmek hem sisteminizin okunabilirliğini hem de bakım kolaylığını ciddi ölçüde artırıyor.
Pratikte en çok Direct ve Topic kullanıyorum. Fanout olayları broadcast etmek için harika, Headers ise çok spesifik senaryolara özel. Yeni bir sistem tasarlarken şu soruyu sormak iyi bir başlangıç noktası: “Bu mesajın gideceği yer, mesajın içeriğine mi yoksa mesajın türüne mi bağlı?” İçeriğe bağlıysa Headers, türüne bağlıysa diğer üç seçenekten birine yönelin.
Son olarak şunu söyleyeyim: Exchange seçimini ileride değiştirmek mümkün ama maliyetli. O yüzden tasarım aşamasında biraz daha fazla zaman harcamak, production’da ciddi migration sancılarından sizi kurtarır. Küçük bir PoC ortamında dört türü de deneyin, davranışlarını gözlemleyin ve ancak ondan sonra production tasarımına geçin.
