MongoDB Change Streams ile Gerçek Zamanlı Veri Takibi

Bir production ortamında aniden “veritabanında bir şey değişti ama biz haberdar olamadık” dediğinizde, bu genellikle pahalıya patlayan bir duruma dönüşür. E-ticaret sitenizde stok sıfıra düştü ama bildirim gitdi, kullanıcı silinidi ama cache temizlenmedi, ödeme durumu güncellendi ama muhasebe sistemi habersiz kaldı. İşte tam bu noktada MongoDB Change Streams devreye giriyor ve hayatı ciddi ölçüde kolaylaştırıyor.

Change Streams Nedir ve Neden Önemlidir

MongoDB Change Streams, 3.6 sürümüyle hayatımıza girdi ve temel olarak şunu yapıyor: MongoDB’nin iç oplog mekanizmasını kullanarak koleksiyonlarda, veritabanlarında veya tüm deployment’ta gerçekleşen değişiklikleri gerçek zamanlı olarak dinlemenizi sağlıyor.

Klasik yaklaşımda ne yapardınız? Ya veritabanını polling ile sürekli sorgulardınız (gereksiz yük, gecikme var), ya da uygulama seviyesinde “şunu yaptım, bunu da yap” mantığıyla ilerlerdiniz (dağınık, fragile mimari). Change Streams bu ikisinden de daha temiz bir çözüm sunuyor.

Temel özellikleri şunlar:

  • Koleksiyon, veritabanı veya deployment seviyesinde dinleme
  • Insert, update, replace, delete ve drop olaylarını yakalama
  • Resume token ile kesintiden sonra kaldığı yerden devam etme
  • Pipeline ile filtreleme ve dönüştürme
  • Replica set ve sharded cluster desteği

Önemli bir not: Change Streams sadece replica set’lerde çalışır. Standalone bir MongoDB instance’ında kullanamazsınız. Production’da zaten replica set kullanıyorsunuzdur umarım, ama test ortamı kuruyorsanız bunu göz önünde bulundurun.

Ortam Hazırlığı

Önce test için basit bir replica set kuralım. Docker Compose kullanacağız:

cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  mongo1:
    image: mongo:7.0
    command: mongod --replSet rs0 --bind_ip_all
    ports:
      - "27017:27017"
    volumes:
      - mongo1_data:/data/db
  
  mongo2:
    image: mongo:7.0
    command: mongod --replSet rs0 --bind_ip_all
    ports:
      - "27018:27017"
    volumes:
      - mongo2_data:/data/db

  mongo3:
    image: mongo:7.0
    command: mongod --replSet rs0 --bind_ip_all
    ports:
      - "27019:27017"
    volumes:
      - mongo3_data:/data/db

volumes:
  mongo1_data:
  mongo2_data:
  mongo3_data:
EOF

docker-compose up -d

# Replica set başlatma
sleep 5
docker exec -it $(docker ps -qf "name=mongo1") mongosh --eval "
rs.initiate({
  _id: 'rs0',
  members: [
    {_id: 0, host: 'mongo1:27017'},
    {_id: 1, host: 'mongo2:27017'},
    {_id: 2, host: 'mongo3:27017'}
  ]
})
"

Replica set hazır olduğunda Node.js ile temel bir change stream listener yazalım. Önce dependency’leri kuralım:

mkdir change-stream-demo && cd change-stream-demo
npm init -y
npm install mongodb dotenv winston

Temel Change Stream Kullanımı

İlk change stream listener’ımızı yazalım:

cat > basic-listener.js << 'EOF'
const { MongoClient } = require('mongodb');

async function watchCollection() {
  const client = new MongoClient('mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0');
  
  await client.connect();
  console.log('MongoDB bağlantısı kuruldu');
  
  const db = client.db('ecommerce');
  const collection = db.collection('orders');
  
  // Temel change stream açma
  const changeStream = collection.watch();
  
  changeStream.on('change', (change) => {
    console.log('Değişiklik algılandı:');
    console.log(JSON.stringify(change, null, 2));
  });
  
  changeStream.on('error', (error) => {
    console.error('Change stream hatası:', error);
  });
  
  // Test için veri ekleyelim
  setTimeout(async () => {
    await collection.insertOne({
      orderId: 'ORD-001',
      customer: 'Ahmet Yılmaz',
      total: 299.90,
      status: 'pending',
      createdAt: new Date()
    });
    console.log('Test verisi eklendi');
  }, 2000);
  
  // 30 saniye sonra kapat
  setTimeout(async () => {
    await changeStream.close();
    await client.close();
    console.log('Bağlantı kapatıldı');
  }, 30000);
}

watchCollection().catch(console.error);
EOF

node basic-listener.js

Bu çıktıyı göreceksiniz:

# Örnek çıktı:
# {
#   "_id": { "_data": "826549..." },
#   "operationType": "insert",
#   "clusterTime": { "$timestamp": { "t": 1699..., "i": 1 } },
#   "fullDocument": {
#     "_id": ObjectId("..."),
#     "orderId": "ORD-001",
#     "customer": "Ahmet Yılmaz",
#     "total": 299.9,
#     "status": "pending",
#     "createdAt": ISODate("...")
#   },
#   "ns": { "db": "ecommerce", "coll": "orders" },
#   "documentKey": { "_id": ObjectId("...") }
# }

Pipeline ile Filtreleme

Her değişikliği işlemek istemeyebilirsiniz. Aggregation pipeline kullanarak sadece ilgilendiğiniz değişiklikleri dinleyebilirsiniz:

cat > filtered-listener.js << 'EOF'
const { MongoClient } = require('mongodb');

async function watchWithFilter() {
  const client = new MongoClient('mongodb://localhost:27017/?replicaSet=rs0');
  await client.connect();
  
  const collection = client.db('ecommerce').collection('orders');
  
  // Sadece belirli olayları dinle
  const pipeline = [
    {
      $match: {
        $or: [
          // Sadece insert ve update'leri al
          { operationType: 'insert' },
          { operationType: 'update' },
          // Status değişikliklerini yakala
          { 'updateDescription.updatedFields.status': { $exists: true } }
        ]
      }
    },
    {
      // Sadece ihtiyacımız olan alanları al
      $project: {
        operationType: 1,
        'fullDocument.orderId': 1,
        'fullDocument.status': 1,
        'fullDocument.total': 1,
        'updateDescription.updatedFields': 1,
        documentKey: 1
      }
    }
  ];
  
  // fullDocument için updateLookup kullan
  const options = {
    fullDocument: 'updateLookup'
  };
  
  const changeStream = collection.watch(pipeline, options);
  
  changeStream.on('change', async (change) => {
    switch(change.operationType) {
      case 'insert':
        console.log(`Yeni sipariş: ${change.fullDocument.orderId} - ${change.fullDocument.total} TL`);
        await notifyNewOrder(change.fullDocument);
        break;
      
      case 'update':
        const updatedFields = change.updateDescription?.updatedFields;
        if (updatedFields?.status) {
          console.log(`Sipariş durumu değişti: ${change.documentKey._id} -> ${updatedFields.status}`);
          await notifyStatusChange(change.documentKey._id, updatedFields.status);
        }
        break;
    }
  });
  
  console.log('Filtrelenmiş change stream dinleniyor...');
}

async function notifyNewOrder(order) {
  // Burada Slack, email, SMS entegrasyonu yapılabilir
  console.log(`[BİLDİRİM] Yeni sipariş bildirimi gönderildi: ${order.orderId}`);
}

async function notifyStatusChange(orderId, newStatus) {
  console.log(`[BİLDİRİM] Sipariş ${orderId} durumu "${newStatus}" olarak güncellendi`);
}

watchWithFilter().catch(console.error);
EOF

fullDocument seçenekleri şunlardır:

  • default: Update olaylarında fullDocument gelmez, sadece değişen alanlar gelir
  • updateLookup: Update sonrasındaki tam dokümanı getirir (ek sorgu maliyeti var)
  • whenAvailable: Mümkünse getirir, yoksa null döner (MongoDB 6.0+)
  • required: Her zaman getirir, yoksa hata fırlatır (MongoDB 6.0+)

Resume Token ile Kesinti Yönetimi

Production’da en kritik konu bu. Uygulamanız çöktüğünde, yeniden başladığında kaldığı yerden devam edebilmeli:

cat > resilient-listener.js << 'EOF'
const { MongoClient } = require('mongodb');
const fs = require('fs');
const path = require('path');

const TOKEN_FILE = '/var/lib/change-stream/resume-token.json';

function saveResumeToken(token) {
  const dir = path.dirname(TOKEN_FILE);
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir, { recursive: true });
  }
  fs.writeFileSync(TOKEN_FILE, JSON.stringify(token));
}

function loadResumeToken() {
  try {
    if (fs.existsSync(TOKEN_FILE)) {
      return JSON.parse(fs.readFileSync(TOKEN_FILE, 'utf8'));
    }
  } catch (err) {
    console.warn('Resume token okunamadı, baştan başlanıyor');
  }
  return null;
}

async function resilientWatcher() {
  const client = new MongoClient('mongodb://localhost:27017/?replicaSet=rs0');
  await client.connect();
  
  const collection = client.db('ecommerce').collection('orders');
  
  const savedToken = loadResumeToken();
  const options = {
    fullDocument: 'updateLookup'
  };
  
  if (savedToken) {
    options.resumeAfter = savedToken;
    console.log('Kaldığı yerden devam ediliyor...');
  } else {
    console.log('İlk kez başlatılıyor...');
  }
  
  const changeStream = collection.watch([], options);
  
  changeStream.on('change', async (change) => {
    try {
      // Değişikliği işle
      await processChange(change);
      
      // Başarıyla işlendiyse token'ı kaydet
      saveResumeToken(change._id);
      
    } catch (err) {
      console.error('Değişiklik işlenirken hata:', err);
      // Token kaydetme, bir sonraki başlatmada bu event tekrar gelsin
    }
  });
  
  // Graceful shutdown
  process.on('SIGTERM', async () => {
    console.log('Kapatılıyor...');
    await changeStream.close();
    await client.close();
    process.exit(0);
  });
  
  process.on('SIGINT', async () => {
    console.log('Kapatılıyor...');
    await changeStream.close();
    await client.close();
    process.exit(0);
  });
}

async function processChange(change) {
  console.log(`İşleniyor: ${change.operationType} - ${change._id._data}`);
  // İş mantığı burada
}

resilientWatcher().catch(console.error);
EOF

Önemli bir uyarı: Resume token, MongoDB oplog’unun retain ettiği süre kadar geçerlidir. Eğer uygulamanız oplog window’undan daha uzun süre kapalı kalırsa, token geçersiz hale gelir. Varsayılan oplog boyutu deployment’a göre değişir, production’da bunu izlemeniz gerekir:

# Oplog durumunu kontrol et
mongosh --eval "rs.printReplicationInfo()"

# Oplog boyutunu kontrol et
mongosh --eval "db.getSiblingDB('local').oplog.rs.stats(1024*1024)"

Gerçek Dünya Senaryosu: E-ticaret Stok Takibi

Elimdeki en somut senaryolardan biri: Bir e-ticaret sisteminde stok sıfıra düştüğünde otomatik tedarikçi bildirimi ve cache invalidation yapılması.

cat > stock-monitor.js << 'EOF'
const { MongoClient } = require('mongodb');

class StockMonitor {
  constructor(mongoUri) {
    this.client = new MongoClient(mongoUri);
    this.changeStream = null;
    this.retryDelay = 5000;
    this.maxRetries = 10;
  }
  
  async start() {
    await this.client.connect();
    console.log('Stok monitörü başlatıldı');
    
    const pipeline = [
      {
        $match: {
          'ns.coll': 'products',
          operationType: { $in: ['update', 'replace'] },
          $or: [
            { 'updateDescription.updatedFields.stock': { $exists: true } },
            { 'updateDescription.updatedFields.reserved': { $exists: true } }
          ]
        }
      }
    ];
    
    this.changeStream = this.client
      .db('ecommerce')
      .watch(pipeline, { fullDocument: 'updateLookup' });
    
    this.changeStream.on('change', this.handleChange.bind(this));
    this.changeStream.on('error', this.handleError.bind(this));
    
    console.log('Stok değişiklikleri izleniyor...');
  }
  
  async handleChange(change) {
    const product = change.fullDocument;
    
    if (!product) return;
    
    const availableStock = product.stock - (product.reserved || 0);
    
    console.log(`Ürün güncellendi: ${product.name} - Mevcut stok: ${availableStock}`);
    
    // Cache'i geçersiz kıl
    await this.invalidateCache(product._id);
    
    // Stok kritik seviyeye düştü mü?
    if (availableStock <= product.lowStockThreshold) {
      await this.sendLowStockAlert(product, availableStock);
    }
    
    // Stok sıfıra düştü mü?
    if (availableStock <= 0) {
      await this.markProductOutOfStock(product._id);
      await this.notifySupplier(product);
    }
    
    // Stok tekrar doldu mu?
    if (availableStock > 0 && product.status === 'out_of_stock') {
      await this.markProductInStock(product._id);
      await this.notifyWaitingCustomers(product._id);
    }
  }
  
  async invalidateCache(productId) {
    // Redis veya başka cache sistemine bağlı olarak implement edilir
    console.log(`[CACHE] Ürün cache'i temizlendi: ${productId}`);
  }
  
  async sendLowStockAlert(product, currentStock) {
    console.log(`[UYARI] Düşük stok: ${product.name} - Kalan: ${currentStock} adet`);
    // Slack webhook, email vb. burada
  }
  
  async markProductOutOfStock(productId) {
    const db = this.client.db('ecommerce');
    await db.collection('products').updateOne(
      { _id: productId },
      { $set: { status: 'out_of_stock', outOfStockAt: new Date() } }
    );
    console.log(`[STOK] Ürün stok dışı işaretlendi: ${productId}`);
  }
  
  async markProductInStock(productId) {
    const db = this.client.db('ecommerce');
    await db.collection('products').updateOne(
      { _id: productId },
      { $set: { status: 'active' }, $unset: { outOfStockAt: 1 } }
    );
    console.log(`[STOK] Ürün tekrar stoğa girdi: ${productId}`);
  }
  
  async notifySupplier(product) {
    console.log(`[TEDARİKÇİ] ${product.supplier} firmasına stok uyarısı gönderildi: ${product.name}`);
  }
  
  async notifyWaitingCustomers(productId) {
    const db = this.client.db('ecommerce');
    const waitlist = await db.collection('stock_waitlist')
      .find({ productId })
      .toArray();
    
    console.log(`[MÜŞTERİ] ${waitlist.length} bekleyen müşteriye bildirim gönderildi`);
  }
  
  async handleError(error) {
    console.error('Change stream hatası:', error.message);
    
    if (error.hasErrorLabel('ResumableChangeStreamError')) {
      console.log('Yeniden bağlanılıyor...');
      await this.reconnect();
    }
  }
  
  async reconnect(retries = 0) {
    if (retries >= this.maxRetries) {
      console.error('Maksimum yeniden bağlantı denemesi aşıldı');
      process.exit(1);
    }
    
    await new Promise(r => setTimeout(r, this.retryDelay));
    
    try {
      await this.start();
    } catch (err) {
      console.error(`Yeniden bağlantı başarısız (deneme ${retries + 1}):`, err.message);
      await this.reconnect(retries + 1);
    }
  }
}

const monitor = new StockMonitor('mongodb://localhost:27017/?replicaSet=rs0');
monitor.start().catch(console.error);
EOF

Systemd Servis Olarak Çalıştırma

Production’da bu listener’ları servis olarak çalıştırmak gerekiyor:

cat > /etc/systemd/system/mongodb-change-stream.service << 'EOF'
[Unit]
Description=MongoDB Change Stream Listener
After=network.target mongod.service
Requires=mongod.service

[Service]
Type=simple
User=nodeapp
WorkingDirectory=/opt/change-stream-demo
ExecStart=/usr/bin/node /opt/change-stream-demo/stock-monitor.js
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=mongodb-change-stream
Environment=NODE_ENV=production
Environment=MONGO_URI=mongodb://app_user:password@mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=rs0&authSource=admin

# Güvenlik ayarları
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ReadWritePaths=/var/lib/change-stream

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable mongodb-change-stream
systemctl start mongodb-change-stream
systemctl status mongodb-change-stream

# Logları takip et
journalctl -u mongodb-change-stream -f

İzleme ve Performans İpuçları

Change stream kullanırken dikkat edilmesi gereken birkaç önemli nokta var:

Oplog boyutunu izleyin:

# Oplog doluluk oranını izle
mongosh --eval "
var stats = db.getSiblingDB('local').oplog.rs.stats();
var usedMB = stats.size / (1024*1024);
var maxMB = stats.maxSize / (1024*1024);
var percent = (usedMB/maxMB*100).toFixed(2);
print('Oplog kullanımı: ' + usedMB.toFixed(2) + ' MB / ' + maxMB.toFixed(2) + ' MB (' + percent + '%)');
"

# Oplog window'unu kontrol et (kaç saatlik veri tutuluyor)
mongosh --eval "rs.printReplicationInfo()"

Change stream sayısını sınırın:

Her change stream bir cursor açar ve sunucu kaynağı tüketir. Tek bir listener ile birden fazla koleksiyonu dinlemek için veritabanı seviyesinde watch kullanabilirsiniz:

cat > multi-collection-listener.js << 'EOF'
const { MongoClient } = require('mongodb');

async function watchDatabase() {
  const client = new MongoClient('mongodb://localhost:27017/?replicaSet=rs0');
  await client.connect();
  
  const db = client.db('ecommerce');
  
  // Tüm veritabanını tek bir stream ile dinle
  const pipeline = [
    {
      $match: {
        'ns.coll': { $in: ['orders', 'products', 'users', 'payments'] },
        operationType: { $in: ['insert', 'update', 'delete'] }
      }
    }
  ];
  
  const changeStream = db.watch(pipeline, { fullDocument: 'updateLookup' });
  
  changeStream.on('change', (change) => {
    const collection = change.ns.coll;
    const operation = change.operationType;
    
    console.log(`[${collection.toUpperCase()}] ${operation}: ${change.documentKey._id}`);
    
    // Router pattern
    const handlers = {
      orders: handleOrderChange,
      products: handleProductChange,
      users: handleUserChange,
      payments: handlePaymentChange
    };
    
    const handler = handlers[collection];
    if (handler) {
      handler(change).catch(err => 
        console.error(`${collection} handler hatası:`, err)
      );
    }
  });
  
  console.log('Tüm koleksiyonlar tek stream ile dinleniyor');
}

async function handleOrderChange(change) {
  console.log(`Sipariş değişikliği: ${change.operationType}`);
}

async function handleProductChange(change) {
  console.log(`Ürün değişikliği: ${change.operationType}`);
}

async function handleUserChange(change) {
  console.log(`Kullanıcı değişikliği: ${change.operationType}`);
}

async function handlePaymentChange(change) {
  console.log(`Ödeme değişikliği: ${change.operationType}`);
}

watchDatabase().catch(console.error);
EOF

Performans için pipeline optimizasyonu:

  • $match stage’ini her zaman pipeline’ın başına koyun, böylece MongoDB erken filtreleme yapabilir
  • $project ile sadece ihtiyacınız olan alanları getirin, fullDocument tüm dokümanı yükler
  • updateLookup ek bir okuma maliyeti yaratır, gerçekten ihtiyacınız varsa kullanın
  • Yüksek yazma hacimli koleksiyonlarda change stream’i dikkatli kullanın, backpressure oluşabilir

Güvenlik yapılandırması:

# Change stream için minimum yetkili kullanıcı oluştur
mongosh --eval "
db.getSiblingDB('admin').createUser({
  user: 'change_stream_user',
  pwd: 'guclu_bir_sifre',
  roles: [
    { role: 'read', db: 'ecommerce' },
    { role: 'read', db: 'local' }
  ]
})
"

Yaygın Hatalar ve Çözümleri

Pratikte sık karşılaşılan durumlar:

MongoServerError: The $changeStream stage is only supported on replica sets hatasında standalone MongoDB kullandığınız anlamına gelir. Replica set kurmak zorundasınız.

Resume token geçersiz hata aldığınızda oplog’un istenen token’dan daha eski kısmı temizlenmiş demektir. Bu durumda startAfter yerine startAtOperationTime ile belirli bir zamandan itibaren başlamayı deneyebilirsiniz.

Change stream cursor closed hatası genellikle ağ kesintisi veya primary değişimi (failover) sonrasında olur. ResumableChangeStreamError label’ına bakarak otomatik yeniden bağlantı mantığı kurmalısınız.

Yüksek hacimli insert’lerde event loop dolabilir. Bu durumda işlemeyi asenkron kuyrukla (bull, BullMQ gibi) ayırmanız daha sağlıklı olur.

Sonuç

MongoDB Change Streams, doğru kullanıldığında uygulama mimarinizi ciddi ölçüde sadeleştiriyor. Polling yerine event-driven bir yaklaşım sunuyor, uygulama seviyesindeki “şunu yaptıktan sonra bunu da tetikle” karmaşıklığını veritabanından okuyarak çözüyor.

Production’a geçmeden önce kontrol listeniz şunları içermeli:

  • Replica set zorunlu, standalone’da çalışmaz
  • Resume token’ı persistent storage’a kaydet, uygulama kapanmalarına karşı hazırlıklı ol
  • Oplog boyutunu ve window’unu izle, token’ların expire olmamasına dikkat et
  • Pipeline’da $match’i öne al, gereksiz fullDocument lookup’tan kaçın
  • Hata yönetimini ihmal etme, özellikle ResumableChangeStreamError’ı yakala
  • Tek bir stream ile birden fazla koleksiyonu dinlemeyi değerlendir

Stok takibi, audit logging, cache invalidation, gerçek zamanlı bildirim sistemleri gibi kullanım senaryolarında Change Streams neredeyse vazgeçilmez hale geliyor. Bir kez uygun şekilde kurduğunuzda, “o değişiklikten haberdar olamadık” sorununu tamamen arkada bırakıyorsunuz.

Yorum yapın