RabbitMQ Shovel ve Federation ile Mesaj Yönlendirme
Dağıtık sistemlerle uğraşan herkesin eninde sonunda karşılaştığı soru şudur: “Farklı lokasyonlardaki RabbitMQ cluster’larım arasında mesajları nasıl taşıyacağım?” İki servisi birbirine bağlamak için doğrudan bağlantı kurmak çoğu zaman yeterli gelir, ama gerçek dünyada işler o kadar basit gitmiyor. Datacenter’lar arası latency, network bölünmeleri, farklı ortamlar arasındaki mesaj yönlendirme ihtiyaçları… Bunların hepsini çözmek için RabbitMQ’nun bize sunduğu iki güçlü araç var: Shovel ve Federation.
Bu yazıda bu iki mekanizmayı sadece teorik olarak değil, gerçek senaryolar üzerinden inceleyeceğiz. Hangisini ne zaman kullanacağınızı, nasıl yapılandıracağınızı ve production’da nelere dikkat etmeniz gerektiğini ele alacağız.
Temel Fark: Shovel mu Federation mu?
Bu soruyu yanlış anlamak, saatler süren hata ayıklama seanslarına yol açabilir. Aralarındaki farkı net koymak gerekirse:
Shovel, mesajları bir kaynaktan alıp başka bir hedefe ileten basit ama güçlü bir “taşıma” mekanizmasıdır. Queue’dan queue’ya, exchange’den exchange’e ya da birinin karışımına mesaj taşıyabilir. Mesajı kaynak taraftan tüketir, hedef tarafa publish eder. Bu kadar.
Federation ise daha sofistike bir “yayılım” mekanizmasıdır. Exchange veya queue seviyesinde çalışır ve upstream/downstream mantığıyla mesajların belirli koşullar altında taşınmasını sağlar. Özellikle exchange federation, consumer’ın nerede olduğunu umursamaksızın mesajın doğru cluster’a ulaşmasını garantiler.
Basit anlatmak gerekirse: Shovel bir nakliye kamyonu gibidir, mesajı alır götürür. Federation ise bir dağıtım ağı gibidir, mesajın nereye ihtiyaç duyulduğuna göre akar.
Plugin Aktivasyonu ve Temel Kurulum
Her iki mekanizma da RabbitMQ plugin sistemi üzerinde çalışır. Kurulumun ilk adımı plugin aktivasyonudur:
# Shovel plugin aktivasyonu
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
# Federation plugin aktivasyonu
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
# Aktif pluginleri kontrol et
rabbitmq-plugins list | grep -E "shovel|federation"
Management UI üzerinden de bu pluginleri aktif edebilirsiniz ama scripted deployment yapıyorsanız CLI her zaman daha temiz bir yol sunar. Özellikle Ansible veya Terraform ile infrastructure yönetiyorsanız bu komutları ilgili task’lara eklemeniz yeterli.
Shovel Yapılandırması: Teoriden Pratiğe
Static Shovel ile Başlamak
Shovel yapılandırması iki şekilde yapılabilir: static (konfigürasyon dosyası üzerinden) ve dynamic (HTTP API veya Management UI üzerinden). Static shovel’lar node restart’larında otomatik olarak geri gelir, bu yüzden production için genellikle tercih edilir.
/etc/rabbitmq/rabbitmq.conf ya da eski format olan advanced.config üzerinden yapılandırılabilir. Modern kurulumlar için rabbitmq.conf tercih edin:
# /etc/rabbitmq/rabbitmq.conf içine eklenecek statik shovel örneği
# Not: Shovel için advanced.config hala daha esnek
cat > /etc/rabbitmq/advanced.config << 'EOF'
[
{rabbitmq_shovel,
[{shovels,
[{orders_to_warehouse,
[{source,
[{protocol, amqp091},
{uris, ["amqp://user:pass@source-rabbitmq:5672"]},
{declarations, [
{'queue.declare', [{queue, <<"orders">>}, {durable, true}]}
]},
{queue, <<"orders">>},
{consumer_args, []}
]},
{destination,
[{protocol, amqp091},
{uris, ["amqp://user:pass@dest-rabbitmq:5672"]},
{declarations, [
{'exchange.declare', [{exchange, <<"warehouse">>}, {type, <<"topic">>}, {durable, true}]},
{'queue.declare', [{queue, <<"warehouse.incoming">>}, {durable, true}]},
{'queue.bind', [{queue, <<"warehouse.incoming">>}, {exchange, <<"warehouse">>}, {routing_key, <<"#">>}]}
]},
{exchange, <<"warehouse">>},
{publish_properties, [{delivery_mode, 2}]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
}
].
EOF
# Konfigürasyonu doğrula ve servisi yeniden başlat
rabbitmq-diagnostics check_local_alarms
systemctl restart rabbitmq-server
Dynamic Shovel: HTTP API ile Çalışmak
Gerçek dünyada çoğu zaman shovel’ları runtime’da oluşturmanız gerekir. Belki bir migration işlemi yapıyorsunuz, belki geçici bir yönlendirme kuruyorsunuz. Dynamic shovel bunun için idealdir:
# Yeni bir dynamic shovel oluştur
curl -u admin:password
-X PUT
-H "Content-Type: application/json"
http://source-rabbitmq:15672/api/parameters/shovel/%2F/prod-to-staging
-d '{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://user:pass@source-rabbitmq:5672",
"src-queue": "orders",
"src-prefetch-count": 100,
"dest-protocol": "amqp091",
"dest-uri": "amqp://user:pass@staging-rabbitmq:5672",
"dest-queue": "orders-staging",
"ack-mode": "on-confirm",
"reconnect-delay": 5,
"src-delete-after": "never"
}
}'
# Shovel durumunu kontrol et
curl -s -u admin:password
http://source-rabbitmq:15672/api/shovels
| python3 -m json.tool | grep -E "name|state"
# Belirli bir shovel'ı sil
curl -u admin:password
-X DELETE
http://source-rabbitmq:15672/api/parameters/shovel/%2F/prod-to-staging
src-delete-after parametresi özellikle dikkat gerektiriyor. never değeri shovel’ın sürekli çalışmasını sağlar. queue-length ile birlikte kullanırsanız, kaynak queue boşaldığında shovel otomatik olarak durur. Bu özellikle queue migration senaryolarında işe yarar.
Shovel ile Dead Letter Queue Yönetimi
Üretim ortamında sıkça karşılaştığım bir senaryo: işlenemeyen mesajları (dead letter) merkezi bir analiz sistemine göndermek. Her microservice’in kendi DLQ’su var ve bunları tek bir yerde toplamak istiyorsunuz:
# Her servis için DLQ'dan merkezi analiz sistemine shovel
for service in order-service payment-service inventory-service; do
curl -s -u admin:password
-X PUT
-H "Content-Type: application/json"
http://rabbitmq:15672/api/parameters/shovel/%2F/dlq-${service}
-d "{
"value": {
"src-protocol": "amqp091",
"src-uri": "amqp://user:pass@rabbitmq:5672",
"src-queue": "${service}.dlq",
"dest-protocol": "amqp091",
"dest-uri": "amqp://user:pass@analytics-rabbitmq:5672",
"dest-queue": "central-dlq",
"ack-mode": "on-confirm",
"src-prefetch-count": 50
}
}"
echo "Shovel created for ${service}"
done
Federation: Dağıtık Sistemlerin Gerçek Gücü
Federation, Shovel’dan çok daha karmaşık ama aynı zamanda çok daha güçlü senaryolara izin verir. İki modda çalışır: Exchange Federation ve Queue Federation.
Upstream Tanımlama
Federation’ın ilk adımı upstream’leri tanımlamaktır. Upstream, mesajların geldiği kaynağı temsil eder:
# Upstream tanımlama - DC1 için
curl -u admin:password
-X PUT
-H "Content-Type: application/json"
http://dc2-rabbitmq:15672/api/parameters/federation-upstream/%2F/dc1-upstream
-d '{
"value": {
"uri": "amqp://user:password@dc1-rabbitmq:5672",
"expires": 3600000,
"message-ttl": 86400000,
"max-hops": 1,
"prefetch-count": 1000,
"reconnect-delay": 5,
"ack-mode": "on-confirm",
"trust-user-id": false
}
}'
# Upstream durumunu kontrol et
curl -s -u admin:password
http://dc2-rabbitmq:15672/api/federation-links
| python3 -m json.tool
max-hops parametresi kritik bir öneme sahip. Federation’da mesajlar federated exchange’ler arasında “hop” eder. Eğer bunu sınırlamazsanız, iki datacenter arasında birbirini upstream olarak gören federation konfigürasyonu oluşturduğunuzda mesajlar sonsuz döngüye girebilir. Genel kural: max-hops: 1 çoğu senaryo için yeterli.
Exchange Federation Politikası
Federation’ın ikinci adımı, hangi exchange’lerin federe edileceğini belirleyen politikaları oluşturmaktır:
# Belirli bir pattern'e uyan exchange'ler için federation policy
curl -u admin:password
-X PUT
-H "Content-Type: application/json"
http://dc2-rabbitmq:15672/api/policies/%2F/federate-exchanges
-d '{
"pattern": "^federated\.",
"definition": {
"federation-upstream-set": "all"
},
"priority": 10,
"apply-to": "exchanges"
}'
# Belirli bir upstream ile çalışan policy
curl -u admin:password
-X PUT
-H "Content-Type: application/json"
http://dc2-rabbitmq:15672/api/policies/%2F/federate-orders
-d '{
"pattern": "^orders",
"definition": {
"federation-upstream": "dc1-upstream"
},
"priority": 5,
"apply-to": "exchanges"
}'
Queue Federation: Ne Zaman Kullanmalı?
Queue federation, exchange federation’dan farklı çalışır ve çoğu zaman yanlış anlaşılır. Queue federation, consumer’ların birden fazla cluster üzerinde dağıtık halde çalıştığı senaryolar için tasarlanmıştır.
Diyelim ki İstanbul ve Ankara datacenter’larınız var. İstanbul’da üretilen mesajları, Ankara’daki consumer’lar tüketmek istiyor ama önce yerel consumer’ların önceliği olmasını istiyorsunuz. Queue federation tam olarak bunu yapar:
# Queue federation upstream (Ankara'daki RabbitMQ'da)
curl -u admin:password
-X PUT
-H "Content-Type: application/json"
http://ankara-rabbitmq:15672/api/parameters/federation-upstream/%2F/istanbul-upstream
-d '{
"value": {
"uri": "amqp://user:password@istanbul-rabbitmq:5672",
"prefetch-count": 100,
"reconnect-delay": 10,
"ack-mode": "on-confirm",
"max-hops": 1
}
}'
# Queue federation policy
curl -u admin:password
-X PUT
-H "Content-Type: application/json"
http://ankara-rabbitmq:15672/api/policies/%2F/federate-queues
-d '{
"pattern": "^shared\.",
"definition": {
"federation-upstream": "istanbul-upstream"
},
"priority": 10,
"apply-to": "queues"
}'
Queue federation’ın kritik bir davranışı var: Ankara’da aktif consumer yoksa, federation link İstanbul’dan mesaj çekmez. Consumer Ankara’ya bağlandığında federation devreye girer ve İstanbul’daki mesajları çekmeye başlar. Bu “on-demand” davranış hem avantaj hem dezavantaj olabilir.
Gerçek Dünya: Multi-Region E-Ticaret Senaryosu
Birlikte çalıştığım bir e-ticaret projesinde şu yapıyı kurmuştuk. İstanbul’da primary cluster, Frankfurt’ta disaster recovery cluster. Sipariş mesajları her iki cluster’da da işlenebilmeli ama çakışma olmamalıydı.
Yapı şöyle kurulmuştu:
# Frankfurt upstream tanımı (İstanbul'u upstream olarak gören Frankfurt RabbitMQ'da)
curl -u admin:password
-X PUT
-H "Content-Type: application/json"
http://frankfurt-rabbitmq:15672/api/parameters/federation-upstream/%2F/istanbul-primary
-d '{
"value": {
"uri": "amqps://app:s3cur3p@ss@istanbul-rabbitmq:5671",
"expires": 1800000,
"max-hops": 1,
"prefetch-count": 500,
"reconnect-delay": 15,
"ack-mode": "on-confirm"
}
}'
# Sipariş exchange'leri için federation
curl -u admin:password
-X PUT
-H "Content-Type: application/json"
http://frankfurt-rabbitmq:15672/api/policies/%2F/order-federation
-d '{
"pattern": "^order\.",
"definition": {
"federation-upstream": "istanbul-primary",
"ha-mode": "all",
"ha-sync-mode": "automatic"
},
"priority": 10,
"apply-to": "all"
}'
# Monitoring için federation link durumunu takip et
watch -n 5 'curl -s -u admin:password http://frankfurt-rabbitmq:15672/api/federation-links | python3 -c "
import json, sys
links = json.load(sys.stdin)
for l in links:
print(f"{l['"'"'name'"'"']} -> {l['"'"'status'"'"']}: {l.get('"'"'error'"'"', '"'"'OK'"'"')}")"'
Monitoring ve Sorun Giderme
Production’da bu kadar karmaşık bir yapıyı kurunca monitoring kritik hale geliyor. Shovel ve Federation için özel metriklere odaklanmak gerekiyor:
#!/bin/bash
# shovel_health_check.sh - Cron ile her 5 dakikada çalıştır
RABBITMQ_HOST="localhost"
RABBITMQ_PORT="15672"
RABBITMQ_USER="admin"
RABBITMQ_PASS="password"
ALERT_WEBHOOK="https://hooks.slack.com/services/xxx/yyy/zzz"
# Shovel durumlarını kontrol et
SHOVELS=$(curl -s -u ${RABBITMQ_USER}:${RABBITMQ_PASS}
http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api/shovels)
# Terminated durumundaki shovel'ları bul
TERMINATED=$(echo $SHOVELS | python3 -c "
import json, sys
shovels = json.load(sys.stdin)
terminated = [s['name'] for s in shovels if s.get('state') == 'terminated']
if terminated:
print('ALERT: Terminated shovels: ' + ', '.join(terminated))
" 2>/dev/null)
if [ -n "$TERMINATED" ]; then
curl -s -X POST -H 'Content-type: application/json'
--data "{"text": "${TERMINATED}"}"
${ALERT_WEBHOOK}
fi
# Federation link kontrolü
FED_LINKS=$(curl -s -u ${RABBITMQ_USER}:${RABBITMQ_PASS}
http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api/federation-links)
DISCONNECTED=$(echo $FED_LINKS | python3 -c "
import json, sys
links = json.load(sys.stdin)
bad = [l['name'] for l in links if l.get('status') != 'running']
if bad:
print('ALERT: Disconnected federation links: ' + ', '.join(bad))
" 2>/dev/null)
if [ -n "$DISCONNECTED" ]; then
curl -s -X POST -H 'Content-type: application/json'
--data "{"text": "${DISCONNECTED}"}"
${ALERT_WEBHOOK}
fi
echo "Health check completed at $(date)"
Yaygın Sorunlar ve Çözümleri
Shovel veya Federation bağlantısı sürekli kesiliyor ve connection_refused hatası alıyorsanız:
- Firewall kuralları: 5672 (AMQP) veya 5671 (AMQPS) portlarının açık olduğunu doğrulayın
- Credential doğrulama: URI’deki kullanıcı adı ve şifrenin özel karakterler içermesi durumunda URL encoding gerekebilir
- vhost izinleri: Bağlantı kullanıcısının hedef vhost’ta gerekli izinlere sahip olduğunu kontrol edin
# Bağlantı sorunlarını debug et
rabbitmq-diagnostics check_running
rabbitmqctl list_connections | grep shovel
rabbitmqctl eval 'rabbit_shovel_status:status().'
# Federation link hata detayları
curl -s -u admin:password
http://rabbitmq:15672/api/federation-links
| python3 -c "
import json, sys
for link in json.load(sys.stdin):
if 'error' in link:
print(f"Link: {link['name']}, Error: {link['error']}")"
Performans Optimizasyonu
Shovel ve Federation’ın performansını etkileyen birkaç kritik parametre var:
prefetch-count: Shovel’ın bir seferde kaç mesaj çekeceğini belirler. Küçük mesajlar için 500-1000 değerleri makul. Büyük mesajlar için 50-100 ile başlayın.
ack-mode seçimi: Üç seçenek var:
on-confirm: En güvenli, en yavaş. Kaynak mesajı ancak hedef onayladıktan sonra siler.on-publish: Orta güvenlik. Hedef broker’a publish edilince kaynaktan siler.no-ack: En hızlı, en riskli. Kayıp mesaj riskini göze alabiliyorsanız kullanın.
Production’da neredeyse her zaman on-confirm ile gitmenizi öneririm. Performans farkı sandığınız kadar büyük değil ama veri kaybı riski çok daha ciddi bir problem.
reconnect-delay: Bağlantı kopması durumunda yeniden bağlanma gecikmesi. Çok düşük değer (1-2 saniye) cascade failure riskini artırır. 5-15 saniye arası genellikle iyi bir denge noktasıdır.
Shovel ile Federation Birlikte Kullanımı
Bu iki mekanizma birbirinin alternatifi değil, tamamlayıcısıdır. Gerçek dünyada ikisini birden kullandığınız senaryolar çok daha sık karşılaşılan durumdur:
Federation ile iki cluster arasında exchange seviyesinde mesaj akışı sağlayın, ancak belirli mesaj tiplerini ya da DLQ içeriklerini Shovel ile farklı bir hedefe taşıyın. Örneğin, federated exchange üzerinden akan payment mesajları bir log analiz sistemine de gidecekse, Shovel bu kopyalama işini üstlenebilir.
Önemli bir uyarı: Aynı mesaj üzerinde hem Federation hem Shovel çalışıyorsa, mesajın iki kez işlenme riski var. x-death header’larını ve max-hops değerlerini dikkatli yönetmeniz gerekiyor.
Sonuç
RabbitMQ Shovel ve Federation, dağıtık mesajlaşma mimarilerinin karmaşıklığını yönetmek için gerçekten güçlü araçlar. Ama her güçlü araç gibi, bunları da dikkatli ve bilinçli kullanmak gerekiyor.
Karar verirken şu basit soruları kendinize sorun: Mesajları fiziksel olarak bir noktadan başka bir noktaya taşımak mı istiyorsunuz? O zaman Shovel. Dağıtık consumer’ların bulundukları cluster’dan bağımsız olarak mesajlara ulaşmasını mı istiyorsunuz? O zaman Federation.
İkisini de production’a almadan önce mutlaka staging ortamında test edin. ack-mode seçiminizin, prefetch-count değerlerinizin ve max-hops limitlerinin nasıl davrandığını anlayın. Ve her şeyden önce monitoring’i ihmal etmeyin. Bu sistemler sessizce bozulabilir ve fark ettiğinizde mesaj kaybı yaşanmış olabilir.
Shovel’ı ilk kez production’da kullandığımda, on-publish ack-mode ile başlamıştım ve bir network partition sırasında yaklaşık 300 mesaj kayıp etmiştim. O günden beri on-confirm dışındaki seçeneği ciddiye almıyorum. Kendi deneyimlerinizde de bu tür küçük ama kritik detayların farkında olun.
