Kafka ile Node.js Entegrasyonu: KafkaJS Kullanım Rehberi

Üretim ortamında Kafka kullanmaya başladığımda, Node.js tarafında hangi kütüphaneyi kullanacağım konusunda epey vakit harcadım. node-rdkafka mı, kafka-node mu, yoksa kafkajs mi? Sonunda kafkajs‘e karar verdim ve bu karardan hiç pişman olmadım. Saf JavaScript ile yazılmış olması, harici native bağımlılık gerektirmemesi ve aktif topluluk desteği bu seçimi kolaylaştırdı. Bu yazıda gerçek üretim senaryolarından öğrendiklerimi aktaracağım.

Neden kafkajs?

node-rdkafka C++ binding kullandığı için derleme sorunları çıkarabiliyor, özellikle Alpine tabanlı Docker imajlarında. kafka-node ise artık aktif olarak geliştirilmiyor. kafkajs ise:

  • Saf TypeScript/JavaScript ile yazılmış, native bağımlılık yok
  • Kafka 0.10+ sürümlerini destekliyor
  • Consumer group yönetimi oldukça olgunlaşmış durumda
  • Retry mekanizmaları built-in geliyor
  • TypeScript type tanımları birinci sınıf desteğe sahip

Şimdi kurulumdan başlayalım.

Kurulum ve Temel Yapılandırma

npm install kafkajs
# TypeScript kullanıyorsanız:
npm install --save-dev @types/node

Temel client yapılandırması şöyle görünüyor:

// kafka.client.js
const { Kafka, logLevel } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'siparis-servisi',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  ssl: process.env.NODE_ENV === 'production' ? {
    rejectUnauthorized: true,
    ca: [require('fs').readFileSync('/certs/ca.crt', 'utf-8')],
    key: require('fs').readFileSync('/certs/client.key', 'utf-8'),
    cert: require('fs').readFileSync('/certs/client.crt', 'utf-8'),
  } : false,
  sasl: process.env.KAFKA_USERNAME ? {
    mechanism: 'plain',
    username: process.env.KAFKA_USERNAME,
    password: process.env.KAFKA_PASSWORD,
  } : undefined,
  connectionTimeout: 10000,
  requestTimeout: 30000,
  retry: {
    initialRetryTime: 300,
    retries: 10,
    maxRetryTime: 30000,
    factor: 0.2,
    multiplier: 2,
  },
  logLevel: process.env.NODE_ENV === 'production' 
    ? logLevel.WARN 
    : logLevel.DEBUG,
});

module.exports = kafka;

clientId değerini anlamlı tutun. Kafka broker loglarında ve monitoring araçlarında bu değeri görüyorsunuz, my-app gibi generic isimler değil, siparis-servisi veya odeme-consumer gibi isimler kullanın.

Producer Oluşturma ve Mesaj Gönderme

// producer.js
const kafka = require('./kafka.client');

const producer = kafka.producer({
  allowAutoTopicCreation: false, // Üretimde bu false olmalı!
  transactionTimeout: 30000,
  idempotent: true, // Exactly-once semantics için
  maxInFlightRequests: 5,
});

async function baglan() {
  await producer.connect();
  console.log('Producer bağlandı');
}

async function mesajGonder(topic, mesajlar) {
  await producer.send({
    topic,
    messages: mesajlar.map(mesaj => ({
      key: mesaj.key || null,
      value: JSON.stringify(mesaj.value),
      headers: {
        'content-type': 'application/json',
        'kaynak-servis': 'siparis-servisi',
        'timestamp': Date.now().toString(),
      },
      partition: mesaj.partition, // opsiyonel, genelde Kafka seçer
    })),
    acks: -1, // Tüm ISR replica'larından onay bekle
    timeout: 30000,
  });
}

async function baglantiKes() {
  await producer.disconnect();
}

// Uygulama kapanırken temizlik
process.on('SIGTERM', async () => {
  await baglantiKes();
  process.exit(0);
});

module.exports = { baglan, mesajGonder, baglantiKes };

allowAutoTopicCreation: false ayarına dikkat edin. Üretim ortamında topicler otomatik oluşturulmamalı. Yanlış yazılmış bir topic adı sizi sessizce farklı bir topic’e mesaj göndermeye başlatabilir ve bunu saatlerce fark etmeyebilirsiniz.

idempotent: true ayarı ise ağ hatası durumunda aynı mesajın iki kez yazılmasını engeller. Bunu açtığınızda maxInFlightRequests değeri otomatik olarak 5 ile sınırlanıyor.

Consumer Oluşturma

Consumer tarafı biraz daha karmaşık çünkü partition atama, offset yönetimi ve hata senaryolarını düşünmek gerekiyor.

// consumer.js
const kafka = require('./kafka.client');

const consumer = kafka.consumer({
  groupId: 'siparis-isle-grubu',
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
  maxBytesPerPartition: 1048576, // 1MB
  minBytes: 1,
  maxBytes: 10485760, // 10MB
  maxWaitTimeInMs: 500,
  retry: {
    initialRetryTime: 300,
    retries: 10,
  },
  // Yeni consumer başladığında en baştan mı, en sondan mı okusun?
  fromBeginning: false,
});

async function baslat() {
  await consumer.connect();
  
  await consumer.subscribe({ 
    topics: ['siparis-olusturuldu', 'siparis-guncellendi'],
    fromBeginning: false,
  });

  await consumer.run({
    eachBatchAutoResolve: true,
    autoCommit: true,
    autoCommitInterval: 5000,
    autoCommitThreshold: 100,
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      const deger = JSON.parse(message.value.toString());
      const key = message.key?.toString();
      
      try {
        await siparisIsle(deger);
        
        // Uzun işlemlerde heartbeat göndermezseniz rebalance yaşanır
        await heartbeat();
        
      } catch (hata) {
        console.error(`Mesaj işleme hatası: ${hata.message}`, {
          topic,
          partition,
          offset: message.offset,
          key,
        });
        
        // Kritik hatada consumer'ı durdur ve alerting'e bildir
        if (hata.critical) {
          const resume = pause();
          setTimeout(resume, 60000); // 1 dakika bekle
        }
        
        throw hata; // Retry için fırlat
      }
    },
  });
}

async function siparisIsle(siparis) {
  // İş mantığı burada
  console.log(`Siparis işleniyor: ${siparis.id}`);
}

module.exports = { baslat };

Batch Processing ile Performans

Tek tek mesaj işlemek yerine batch olarak işlemek performansı ciddi artırıyor. Özellikle veritabanı yazma işlemleri yapıyorsanız, her mesaj için ayrı bir INSERT yerine toplu INSERT çok daha verimli.

// batch-consumer.js
await consumer.run({
  eachBatch: async ({ 
    batch, 
    resolveOffset, 
    heartbeat, 
    commitOffsetsIfNecessary,
    uncommittedOffsets,
    isRunning, 
    isStale,
    pause,
  }) => {
    const mesajlar = [];
    
    for (const mesaj of batch.messages) {
      if (!isRunning() || isStale()) break;
      
      const deger = JSON.parse(mesaj.value.toString());
      mesajlar.push({
        ...deger,
        _kafka: {
          topic: batch.topic,
          partition: batch.partition,
          offset: mesaj.offset,
        },
      });
      
      // Her 10 mesajda bir heartbeat gönder
      if (mesajlar.length % 10 === 0) {
        await heartbeat();
      }
    }
    
    if (mesajlar.length > 0) {
      // Toplu veritabanı yazımı
      await veritabaniBulkYaz(mesajlar);
      
      // Son mesajın offsetini commit et
      const sonMesaj = batch.messages[batch.messages.length - 1];
      resolveOffset(sonMesaj.offset);
      await commitOffsetsIfNecessary();
    }
  },
});

async function veritabaniBulkYaz(mesajlar) {
  // Örnek: PostgreSQL ile bulk insert
  const degerler = mesajlar.map(m => [m.id, m.durum, m.timestamp]);
  await db.query(
    'INSERT INTO siparisler (id, durum, timestamp) VALUES ?',
    [degerler]
  );
}

Bu yaklaşımda dikkat edilmesi gereken nokta: resolveOffset işlemi başarılı veritabanı yazımından sonra çağrılmalı. Önce offset commit edip sonra yazarsanız, servis çöktüğünde o mesajları tekrar okuyamazsınız.

Admin API ile Topic Yönetimi

// topic-yonetim.js
const kafka = require('./kafka.client');

const admin = kafka.admin();

async function topicOlustur(topicAdi, bolumSayisi = 6, replikaSayisi = 3) {
  await admin.connect();
  
  try {
    const mevcutTopicler = await admin.listTopics();
    
    if (mevcutTopicler.includes(topicAdi)) {
      console.log(`Topic zaten mevcut: ${topicAdi}`);
      return;
    }
    
    await admin.createTopics({
      waitForLeaders: true,
      timeout: 30000,
      topics: [{
        topic: topicAdi,
        numPartitions: bolumSayisi,
        replicationFactor: replikaSayisi,
        configEntries: [
          { name: 'retention.ms', value: '604800000' }, // 7 gün
          { name: 'cleanup.policy', value: 'delete' },
          { name: 'compression.type', value: 'lz4' },
          { name: 'min.insync.replicas', value: '2' },
        ],
      }],
    });
    
    console.log(`Topic oluşturuldu: ${topicAdi}`);
  } finally {
    await admin.disconnect();
  }
}

async function consumerGrupOffsetSifirla(grupId, topicAdi) {
  await admin.connect();
  
  try {
    // Grubu durdur
    await admin.deleteGroups([grupId]);
    
    // Offseti başa al
    await admin.resetOffsets({
      groupId: grupId,
      topic: topicAdi,
      earliest: true,
    });
    
    console.log(`Offset sıfırlandı: ${grupId} / ${topicAdi}`);
  } finally {
    await admin.disconnect();
  }
}

module.exports = { topicOlustur, consumerGrupOffsetSifirla };

min.insync.replicas: 2 ayarı önemli. Bu, producer’ın acks: -1 ile mesaj gönderirken en az 2 replica’nın onayını beklemesini sağlar. Tek broker kaldığında write işlemleri hata verir ama veri kaybı yaşamazsınız.

Gerçek Dünya Senaryosu: Sipariş İşleme Sistemi

Birleştirelim ve gerçekçi bir senaryo oluşturalım:

// siparis-servisi/index.js
const express = require('express');
const kafka = require('./kafka.client');
const { topicOlustur } = require('./topic-yonetim');

const app = express();
app.use(express.json());

const producer = kafka.producer({ idempotent: true });
const consumer = kafka.consumer({ groupId: 'siparis-isle-grubu-v2' });

// Uygulama başlangıcı
async function baslat() {
  // Önce topic'lerin var olduğundan emin ol
  await topicOlustur('siparis-olusturuldu', 6, 3);
  await topicOlustur('siparis-odeme-bekliyor', 6, 3);
  
  await producer.connect();
  await consumer.connect();
  
  await consumer.subscribe({ 
    topics: ['siparis-olusturuldu'],
    fromBeginning: false,
  });
  
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const siparis = JSON.parse(message.value.toString());
      
      console.log(`[${topic}][P${partition}][O${message.offset}] Siparis: ${siparis.id}`);
      
      // İş mantığı
      await stoguKontrolEt(siparis);
      
      // Sonraki adım için event yayınla
      await producer.send({
        topic: 'siparis-odeme-bekliyor',
        messages: [{
          key: siparis.musteriId.toString(),
          value: JSON.stringify({
            ...siparis,
            durum: 'ODEME_BEKLIYOR',
            guncellemeZamani: new Date().toISOString(),
          }),
        }],
      });
    },
  });
  
  app.listen(3000, () => {
    console.log('Servis ayakta: 3000');
  });
}

app.post('/siparis', async (req, res) => {
  const siparis = {
    id: require('crypto').randomUUID(),
    ...req.body,
    olusturmaZamani: new Date().toISOString(),
  };
  
  await producer.send({
    topic: 'siparis-olusturuldu',
    messages: [{
      key: siparis.musteriId.toString(),
      value: JSON.stringify(siparis),
    }],
  });
  
  res.status(202).json({ siparisId: siparis.id, durum: 'ISLEME_ALINDI' });
});

// Graceful shutdown
async function kapat() {
  console.log('Servis kapatılıyor...');
  await consumer.disconnect();
  await producer.disconnect();
  process.exit(0);
}

process.on('SIGTERM', kapat);
process.on('SIGINT', kapat);

baslat().catch(console.error);

musteriId değerini mesaj key’i olarak kullandığımıza dikkat edin. Bu sayede aynı müşteriye ait tüm siparişler aynı partition’a düşüyor ve işlem sırası garanti altına alınıyor. Event sourcing veya CQRS kullanıyorsanız bu kritik.

Hata Yönetimi ve Dead Letter Queue

// dlq-handler.js
const { KafkaJSNonRetriableError } = require('kafkajs');

const DLQ_TOPIC = 'siparis-hata-kuyrugu';

async function guvenliIsle(producer, consumer, islevFn) {
  return consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const hammMesaj = message.value.toString();
      
      try {
        const deger = JSON.parse(hammMesaj);
        await islevFn(deger);
        
      } catch (hata) {
        console.error('Mesaj işlenemedi, DLQ'ya gönderiliyor:', hata.message);
        
        await producer.send({
          topic: DLQ_TOPIC,
          messages: [{
            key: message.key,
            value: hammMesaj,
            headers: {
              'hata-mesaji': hata.message,
              'kaynak-topic': topic,
              'kaynak-partition': partition.toString(),
              'kaynak-offset': message.offset.toString(),
              'deneme-zamani': new Date().toISOString(),
              // Orijinal headers'ı koru
              ...message.headers,
            },
          }],
        });
        
        // DLQ'ya yazdıktan sonra offset'i ilerlet, retry yapma
        // Aksi takdirde zehirli mesaj (poison pill) tüm consumer'ı bloklar
      }
    },
  });
}

DLQ (Dead Letter Queue) implementasyonu olmadan Kafka consumer’ı üretime almayın. Bir mesaj parse edilemiyorsa veya iş mantığınız sürekli hata veriyorsa, consumer tüm partition’ı bloklar. DLQ bu tür “zehirli mesajları” devre dışı bırakmanızı sağlar.

Monitoring ve Sağlık Kontrolü

// healthcheck.js
const kafka = require('./kafka.client');
const admin = kafka.admin();

async function saglikKontrol() {
  try {
    await admin.connect();
    
    const cluster = await admin.describeCluster();
    const topics = await admin.listTopics();
    
    const saglik = {
      durum: 'saglikli',
      broker_sayisi: cluster.brokers.length,
      controller_id: cluster.controllerId,
      topic_sayisi: topics.length,
      zaman: new Date().toISOString(),
    };
    
    await admin.disconnect();
    return saglik;
    
  } catch (hata) {
    return {
      durum: 'hata',
      hata: hata.message,
      zaman: new Date().toISOString(),
    };
  }
}

// Express endpoint
app.get('/health/kafka', async (req, res) => {
  const saglik = await saglikKontrol();
  const httpKod = saglik.durum === 'saglikli' ? 200 : 503;
  res.status(httpKod).json(saglik);
});

Dikkat Edilmesi Gereken Noktalar

Üretimde kafkajs kullanırken öğrendiğim bazı kritik noktalar:

  • Rebalance fırtınaları: Consumer grubunuzdaki instance sayısını hızlıca değiştirirseniz sürekli rebalance yaşanır. sessionTimeout ve heartbeatInterval değerlerini düzgün ayarlayın. Genellikle sessionTimeout / heartbeatInterval = 3 oranı iyi çalışıyor.
  • Offset commit stratejisi: autoCommit: true kolaylık sağlar ama mesaj işlenmeden commit edilebilir. Kritik iş akışlarında manuel offset yönetimi yapın.
  • Partition sayısı: Bir topic oluşturduktan sonra partition sayısını artırabilirsiniz ama azaltamazsınız. İlk başta yeterli partition açın. Kural olarak: consumer sayısının en az 2 katı partition kullanıyorum.
  • Mesaj boyutu: Varsayılan maksimum mesaj boyutu 1MB. Büyük payload gönderecekseniz hem broker hem producer/consumer tarafında message.max.bytes ayarını güncelleyin. Ama büyük mesajlar yerine referans (ID) göndermek genelde daha iyi mimari kararı.
  • Graceful shutdown: SIGTERM sinyalini yakalayıp consumer’ı düzgün kapatmazsanız, pod yeniden başladığında rebalance gecikmesi yaşarsınız. Bu gecikme Kubernetes ortamlarında rolling deployment sırasında can sıkıcı olabiliyor.
  • Log seviyesi: logLevel.DEBUG üretimde tonlarca log üretir. logLevel.WARN veya logLevel.ERROR kullanın.
  • Connection pooling: Her servis instance’ı için tek bir Kafka client oluşturun, request başına client açıp kapamayın.

Sonuç

kafkajs, Node.js ekosisteminde Kafka entegrasyonu için olgunlaşmış ve güvenilir bir seçenek. Native bağımlılık gerektirmemesi Docker ortamlarında büyük avantaj sağlıyor. Ancak kütüphane ne kadar iyi olursa olsun, Kafka’nın temel kavramlarını, yani partition, consumer group, offset yönetimi ve replication gibi konuları anlamadan doğru implementasyon yapamıyorsunuz.

Bu yazıda anlattıklarımı özetlersek: Producer tarafında idempotent: true ve acks: -1 ile veri kaybını önleyin. Consumer tarafında heartbeat yönetimine dikkat edin, DLQ implementasyonu yapmadan üretime çıkmayın. Topic konfigürasyonunu kod ile yönetin, elle yapılan değişiklikler kaybolur. Graceful shutdown mekanizmasını muhakkak ekleyin.

Sonraki adım olarak Kafka Streams veya ksqlDB ile stream processing tarafına geçebilirsiniz. Ama önce basit producer/consumer senaryolarını üretime almanızı ve orada karşılaştığınız sorunları çözmenizi öneririm. Kafajs’in GitHub repo’sundaki issue’lar ve wiki sayfası da üretim senaryolarında oldukça faydalı referans kaynakları.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir