RabbitMQ ile Node.js Producer ve Consumer Yazımı
Mesaj kuyruklarıyla ilk kez çalıştığımda, “neden bu kadar karmaşık bir şeye ihtiyaç var ki?” diye düşünmüştüm. HTTP üzerinden direkt servisler arası iletişim kurarken her şey ne güzel çalışıyordu. Ta ki yük altında sistemler birbirine girene kadar. O günden bu yana RabbitMQ benim için vazgeçilmez bir araç haline geldi ve bugün Node.js ile nasıl etkili producer-consumer yapısı kurulacağını, gerçek üretim ortamından örneklerle aktaracağım.
RabbitMQ’ya Neden İhtiyaç Duyuyoruz?
Şöyle bir senaryo düşünün: bir e-ticaret sisteminde kullanıcı sipariş verdiğinde aynı anda fatura oluşturma, stok güncelleme, kargo bildirimi ve e-posta gönderme işlemlerinin tetiklenmesi gerekiyor. Bunların hepsini senkron HTTP çağrısıyla yapmak hem yavaş hem de kırılgan bir yapı oluşturur. Fatura servisi 2 saniye gecikirse kullanıcı 2 saniye bekler. Kargo servisi çökerse sipariş de başarısız sayılır.
İşte RabbitMQ tam bu noktada devreye giriyor. Sipariş oluşturulduğunda bir mesaj kuyruğa bırakılır, ilgili tüm servisler bu mesajı kendi tempolerında işler. Ana akış etkilenmez, servisler birbirinden bağımsız çalışır.
Ortamı Hazırlamak
Önce RabbitMQ’yu ayağa kaldıralım. Docker ile başlamak hem hızlı hem de production’a taşıma açısından tutarlı bir yaklaşım:
docker run -d
--name rabbitmq-dev
-p 5672:5672
-p 15672:15672
-e RABBITMQ_DEFAULT_USER=admin
-e RABBITMQ_DEFAULT_PASS=guclu_sifre_123
rabbitmq:3.12-management
# Konteyner durumunu kontrol et
docker ps | grep rabbitmq
# Logları izle
docker logs -f rabbitmq-dev
15672 portu management UI için, 5672 ise AMQP protokolü için kullanılıyor. Tarayıcıdan http://localhost:15672 adresine girip admin/guclu_sifre_123 ile bağlanabilirsiniz.
Node.js tarafında proje yapısını oluşturalım:
mkdir rabbitmq-demo && cd rabbitmq-demo
npm init -y
npm install amqplib dotenv uuid
# Proje yapısı
mkdir -p src/{producer,consumer,config}
touch src/config/rabbitmq.js
touch src/producer/orderProducer.js
touch src/consumer/orderConsumer.js
touch .env
.env dosyamızı hazırlayalım:
RABBITMQ_URL=amqp://admin:guclu_sifre_123@localhost:5672
QUEUE_NAME=order_queue
EXCHANGE_NAME=order_exchange
Bağlantı Yönetimi
RabbitMQ ile çalışırken en çok hata yapılan yer bağlantı yönetimidir. Her mesaj gönderiminde yeni bağlantı açmak, ya da bağlantı koptuğunda uygulamanın çökmesine izin vermek sık görülen sorunlar. Bağlantıyı merkezi bir modülden yönetelim:
// src/config/rabbitmq.js
const amqp = require('amqplib');
require('dotenv').config();
class RabbitMQConnection {
constructor() {
this.connection = null;
this.channel = null;
this.reconnectDelay = 5000;
this.maxRetries = 10;
}
async connect(retryCount = 0) {
try {
console.log(`RabbitMQ bağlantısı kuruluyor... (Deneme: ${retryCount + 1})`);
this.connection = await amqp.connect(process.env.RABBITMQ_URL);
this.channel = await this.connection.createChannel();
// Kanal prefetch ayarı - consumer için kritik
await this.channel.prefetch(10);
// Bağlantı koptuğunda otomatik yeniden bağlan
this.connection.on('error', (err) => {
console.error('RabbitMQ bağlantı hatası:', err.message);
this.reconnect();
});
this.connection.on('close', () => {
console.warn('RabbitMQ bağlantısı kapandı, yeniden bağlanılıyor...');
this.reconnect();
});
console.log('RabbitMQ bağlantısı başarılı');
return this.channel;
} catch (error) {
if (retryCount < this.maxRetries) {
console.error(`Bağlantı hatası: ${error.message}. ${this.reconnectDelay}ms sonra tekrar denenecek.`);
await this.sleep(this.reconnectDelay);
return this.connect(retryCount + 1);
}
throw new Error(`RabbitMQ bağlantısı ${this.maxRetries} denemeden sonra başarısız oldu`);
}
}
async reconnect() {
await this.sleep(this.reconnectDelay);
await this.connect();
}
async closeConnection() {
try {
await this.channel.close();
await this.connection.close();
} catch (err) {
console.error('Bağlantı kapatılırken hata:', err.message);
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
module.exports = new RabbitMQConnection();
prefetch(10) burada çok önemli. Consumer’ın aynı anda kaç mesajı işleyeceğini belirliyor. Bu olmadan RabbitMQ tüm mesajları tek consumer’a yığabilir, diğer consumer’lar boş kalır.
Producer Yazımı
Sipariş oluşturan producer’ımızı yazalım. Gerçek hayatta bu bir REST endpoint’ten tetiklenebilir:
// src/producer/orderProducer.js
const rabbitmq = require('../config/rabbitmq');
const { v4: uuidv4 } = require('uuid');
require('dotenv').config();
const EXCHANGE_NAME = process.env.EXCHANGE_NAME || 'order_exchange';
const QUEUE_NAME = process.env.QUEUE_NAME || 'order_queue';
async function setupExchangeAndQueue(channel) {
// Direct exchange oluştur - kalıcı olsun
await channel.assertExchange(EXCHANGE_NAME, 'direct', {
durable: true,
autoDelete: false
});
// Kuyruk oluştur - kalıcı ve dead letter queue ile
await channel.assertQueue(QUEUE_NAME, {
durable: true,
arguments: {
'x-dead-letter-exchange': 'order_dlx',
'x-message-ttl': 86400000, // 24 saat TTL
'x-max-length': 10000 // Maksimum mesaj sayısı
}
});
// Kuyruğu exchange'e bağla
await channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, 'order.created');
console.log(`Exchange ve kuyruk hazır: ${EXCHANGE_NAME} -> ${QUEUE_NAME}`);
}
async function publishOrder(orderData) {
const channel = await rabbitmq.connect();
await setupExchangeAndQueue(channel);
const message = {
messageId: uuidv4(),
timestamp: new Date().toISOString(),
type: 'order.created',
version: '1.0',
payload: orderData
};
const messageBuffer = Buffer.from(JSON.stringify(message));
// Mesajı yayınla
const published = channel.publish(
EXCHANGE_NAME,
'order.created',
messageBuffer,
{
persistent: true, // Broker restart'ta mesaj kaybolmasın
contentType: 'application/json',
messageId: message.messageId,
timestamp: Date.now(),
headers: {
'x-retry-count': 0,
'x-source-service': 'order-service'
}
}
);
if (published) {
console.log(`Sipariş mesajı gönderildi: ${message.messageId}`);
console.log(`Sipariş ID: ${orderData.orderId}`);
} else {
console.error('Mesaj gönderilemedi, kanal tamponu dolu!');
}
return message.messageId;
}
// Test için birkaç sipariş gönderelim
async function runProducer() {
const testOrders = [
{
orderId: uuidv4(),
customerId: 'CUST-001',
items: [
{ productId: 'PRD-101', quantity: 2, price: 149.99 },
{ productId: 'PRD-205', quantity: 1, price: 89.90 }
],
totalAmount: 389.88,
currency: 'TRY',
shippingAddress: {
city: 'İstanbul',
district: 'Kadıköy'
}
},
{
orderId: uuidv4(),
customerId: 'CUST-002',
items: [
{ productId: 'PRD-301', quantity: 3, price: 55.00 }
],
totalAmount: 165.00,
currency: 'TRY',
shippingAddress: {
city: 'Ankara',
district: 'Çankaya'
}
}
];
console.log('Producer başlatıldı...');
for (const order of testOrders) {
await publishOrder(order);
// Mesajlar arası küçük bir bekleme
await new Promise(resolve => setTimeout(resolve, 500));
}
console.log('Tüm siparişler kuyruğa gönderildi');
// Bağlantıyı temiz kapat
setTimeout(async () => {
await rabbitmq.closeConnection();
process.exit(0);
}, 2000);
}
runProducer().catch(err => {
console.error('Producer hatası:', err);
process.exit(1);
});
persistent: true seçeneğini atlamayın. RabbitMQ yeniden başladığında mesajlar diskten geri yüklenir. Bunu kullanmadan çalışan bir sistem, broker restart’ta tüm bekleyen siparişleri kaybeder.
Consumer Yazımı
Consumer tarafı biraz daha dikkat gerektiriyor. Acknowledgement yönetimi burada kritik:
// src/consumer/orderConsumer.js
const rabbitmq = require('../config/rabbitmq');
require('dotenv').config();
const EXCHANGE_NAME = process.env.EXCHANGE_NAME || 'order_exchange';
const QUEUE_NAME = process.env.QUEUE_NAME || 'order_queue';
// Simüle edilmiş sipariş işleme servisleri
const orderProcessors = {
async createInvoice(order) {
console.log(` [Fatura] Sipariş ${order.orderId} için fatura oluşturuluyor...`);
await sleep(800);
// %10 ihtimalle hata fırlat (gerçek hayat simülasyonu)
if (Math.random() < 0.1) {
throw new Error(`Fatura servisi geçici hata: ${order.orderId}`);
}
console.log(` [Fatura] Fatura oluşturuldu: INV-${Date.now()}`);
return { invoiceId: `INV-${Date.now()}`, status: 'created' };
},
async updateStock(order) {
console.log(` [Stok] ${order.items.length} ürün için stok güncelleniyor...`);
await sleep(300);
console.log(` [Stok] Stok güncellendi`);
return { updated: order.items.length };
},
async sendNotification(order) {
console.log(` [Bildirim] Müşteri ${order.customerId} için bildirim gönderiliyor...`);
await sleep(200);
console.log(` [Bildirim] E-posta gönderildi`);
return { notificationSent: true };
}
};
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function processOrderMessage(channel, msg) {
if (!msg) return;
let parsedMessage;
try {
parsedMessage = JSON.parse(msg.content.toString());
} catch (parseError) {
console.error('Geçersiz JSON formatı, mesaj reddediliyor:', parseError.message);
// Parse edilemeyen mesajı kuyruğa geri koyma
channel.nack(msg, false, false);
return;
}
const { messageId, payload: order, headers = {} } = parsedMessage;
const retryCount = (msg.properties.headers && msg.properties.headers['x-retry-count']) || 0;
console.log(`nSipariş işleniyor: ${messageId}`);
console.log(`Retry sayısı: ${retryCount}`);
console.log(`Sipariş ID: ${order.orderId}`);
console.log(`Toplam Tutar: ${order.totalAmount} ${order.currency}`);
try {
// Tüm alt işlemleri paralel çalıştır
const [invoiceResult, stockResult, notificationResult] = await Promise.all([
orderProcessors.createInvoice(order),
orderProcessors.updateStock(order),
orderProcessors.sendNotification(order)
]);
console.log(`Sipariş başarıyla işlendi: ${order.orderId}`);
console.log(` Fatura: ${invoiceResult.invoiceId}`);
// Başarılı işlemde acknowledge gönder
channel.ack(msg);
} catch (processingError) {
console.error(`Sipariş işleme hatası: ${order.orderId}`, processingError.message);
const maxRetries = 3;
if (retryCount < maxRetries) {
console.log(`Mesaj yeniden kuyruğa alınıyor (${retryCount + 1}/${maxRetries})...`);
// Exponential backoff ile yeniden gönder
const delay = Math.pow(2, retryCount) * 1000;
await sleep(delay);
// Güncellenmiş retry sayısıyla mesajı tekrar yayınla
channel.publish(
EXCHANGE_NAME,
'order.created',
msg.content,
{
...msg.properties,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount + 1,
'x-last-error': processingError.message,
'x-last-retry-at': new Date().toISOString()
}
}
);
// Orijinal mesajı acknowledge et (döngüyü kır)
channel.ack(msg);
} else {
console.error(`Maksimum retry aşıldı, mesaj dead letter kuyruğuna gönderiliyor: ${order.orderId}`);
// requeue: false ile dead letter exchange'e gönder
channel.nack(msg, false, false);
}
}
}
async function startConsumer() {
const channel = await rabbitmq.connect();
// Exchange ve kuyruk varlığını garantile
await channel.assertExchange(EXCHANGE_NAME, 'direct', { durable: true });
await channel.assertQueue(QUEUE_NAME, {
durable: true,
arguments: {
'x-dead-letter-exchange': 'order_dlx',
'x-message-ttl': 86400000,
'x-max-length': 10000
}
});
await channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, 'order.created');
console.log(`Consumer başlatıldı. Kuyruk dinleniyor: ${QUEUE_NAME}`);
console.log('Mesaj bekleniyor... (Çıkmak için CTRL+C)');
// noAck: false - manuel acknowledge kullanıyoruz
channel.consume(QUEUE_NAME, (msg) => {
processOrderMessage(channel, msg);
}, { noAck: false });
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('nConsumer kapatılıyor...');
await rabbitmq.closeConnection();
process.exit(0);
});
}
startConsumer().catch(err => {
console.error('Consumer başlatma hatası:', err);
process.exit(1);
});
Dead Letter Queue Kurulumu
İşlenemeyen mesajların kaybolmaması için DLQ yapısı kurmalıyız. Bu gerçek production ortamlarında ihmal edilemez:
// src/config/setupQueues.js
const rabbitmq = require('./rabbitmq');
async function setupAllQueues() {
const channel = await rabbitmq.connect();
// Dead Letter Exchange
await channel.assertExchange('order_dlx', 'direct', { durable: true });
// Dead Letter Queue
await channel.assertQueue('order_dead_letter', {
durable: true,
arguments: {
'x-message-ttl': 604800000 // 7 gün - incelemek için vakit
}
});
await channel.bindQueue('order_dead_letter', 'order_dlx', 'order.created');
// Ana exchange ve kuyruk
await channel.assertExchange('order_exchange', 'direct', { durable: true });
await channel.assertQueue('order_queue', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'order_dlx',
'x-message-ttl': 86400000,
'x-max-length': 10000
}
});
await channel.bindQueue('order_queue', 'order_exchange', 'order.created');
console.log('Tüm exchange ve kuyruklar yapılandırıldı');
await rabbitmq.closeConnection();
}
setupAllQueues().catch(console.error);
Çalıştırma ve Test
# Önce kuyrukları kur
node src/config/setupQueues.js
# Consumer'ı başlat (ayrı terminal)
node src/consumer/orderConsumer.js
# Producer'ı çalıştır (başka bir terminal)
node src/producer/orderProducer.js
# Birden fazla consumer başlatmak için (yük dağılımı için)
node src/consumer/orderConsumer.js &
node src/consumer/orderConsumer.js &
node src/consumer/orderConsumer.js &
Management UI’dan kuyruk derinliğini, consumer sayısını ve mesaj geçiş hızını izleyebilirsiniz.
Yaygın Hatalar ve Çözümleri
Production ortamında karşılaştığım ve “keşke önceden bilseydim” dediğim birkaç kritik nokta:
Channel ve Connection farkı: Her işlem için yeni connection açmak kaynak israfıdır. Bir connection üzerinde birden fazla channel açılabilir. Consumer başına bir channel yeterlidir.
noAck: true kullanmak: Bunu yaparsanız mesaj işleme sırasında uygulamanız çökse bile RabbitMQ mesajı teslim edilmiş sayar. Her zaman manuel ack kullanın.
Sonsuz retry döngüsü: Hata durumunda mesajı doğrudan requeue yaparsanız (nack(msg, false, true)) mesaj sürekli kuyruğa dönüp aynı hatayı verebilir ve sisteminizi felç eder. Retry sayacı ve DLQ şarttır.
Bağlantı havuzu yokluğu: Yoğun sistemlerde tek bir channel’ı tüm producer instance’ları paylaşmak yerine channel havuzu kullanmayı düşünün.
Heartbeat ayarı: Uzun süren işlemlerde RabbitMQ bağlantıyı ölü sanıp kapatabilir. AMQP URL’ine ?heartbeat=60 ekleyin:
RABBITMQ_URL=amqp://admin:sifre@localhost:5672?heartbeat=60
Monitoring ve Alerting
Production’da şu metrikleri mutlaka izleyin:
- Queue depth: Kuyruk derinliği sürekli artıyorsa consumer’lar yetişemiyor demektir
- Consumer count: Consumer sayısı sıfıra düşerse alarm üretmeli
- Dead letter queue: Buraya düşen mesajları ihmal etmeyin
- Message rate: Saniyedeki mesaj işleme hızı baseline’ınızı bilmeniz gerekir
RabbitMQ Management API’sini kullanarak bu metrikleri Prometheus veya Grafana’ya aktarabilirsiniz. rabbitmq_prometheus eklentisi bunu çok kolaylaştırıyor:
# Management container'ında plugin'i aktifleştir
docker exec rabbitmq-dev rabbitmq-plugins enable rabbitmq_prometheus
# Metrikler artık bu endpoint'ten erişilebilir
curl http://localhost:15692/metrics
Sonuç
RabbitMQ ve Node.js kombinasyonu, servisleri birbirinden bağımsız hale getirmenin en pragmatik yollarından biri. Burada anlattıklarım bir başlangıç noktası; topic exchange’ler, fanout exchange’ler, priority queue’lar ve cluster kurulumu gibi konular ayrı yazıları hak ediyor.
En önemli çıkarımları özetlersem: bağlantı yönetimini merkezi yapın, her zaman manuel acknowledge kullanın, DLQ’yu ihmal etmeyin ve retry mantığını döngüden çıkarın. Bu dördünü doğru yaparsanız, RabbitMQ gerçek anlamda güvenilir bir mesajlaşma altyapısı sunar.
Kod örneklerini kendi senaryonuza uyarlarken kuyruk ve exchange isimlerini anlamlı tutun. Birkaç ay sonra logları incelerken “bu kuyruk ne işe yarıyordu?” sorusuyla boğuşmak istemezsiniz.
