MongoDB Aggregation Pipeline ile Karmaşık Sorgular

MongoDB’de veri analizi yaparken basit find() sorgularının yetersiz kaldığı anı hepimiz yaşamışızdır. Onlarca koleksiyonda milyonlarca kayıt var, bunları gruplamanız, filtrelemeniz, dönüştürmeniz ve raporlamanız gerekiyor. İşte tam bu noktada Aggregation Pipeline devreye giriyor. Bunu bir veri fabrikası gibi düşünebilirsiniz: ham veri bir ucundan giriyor, her aşamada işleniyor ve sonunda istediğiniz formatta çıkıyor.

Bu yazıda gerçek dünya senaryoları üzerinden Aggregation Pipeline’ı derinlemesine inceleyeceğiz. E-ticaret sistemleri, log analizi, kullanıcı davranışı takibi gibi sysadmin ve backend dünyasında karşılaşacağınız tipik problemleri ele alacağız.

Aggregation Pipeline Nedir ve Nasıl Çalışır

Aggregation Pipeline, MongoDB’nin en güçlü özelliklerinden biridir. Temel mantık şu: her stage (aşama) bir önceki aşamanın çıktısını alır, üzerinde işlem yapar ve sonraki aşamaya iletir. SQL dünyasından geliyorsanız bunu GROUP BY, JOIN, HAVING ve ORDER BY kombinasyonu olarak düşünebilirsiniz, ama çok daha esnek.

Örnek bir pipeline şöyle görünür:

db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: { _id: "$customerId", totalSpent: { $sum: "$amount" } } },
  { $sort: { totalSpent: -1 } },
  { $limit: 10 }
])

Bu örnekte sırasıyla şunlar oluyor: tamamlanan siparişleri filtrele, müşteri bazında topla, harcamaya göre sırala ve ilk 10’u getir. Dört satırda güçlü bir rapor.

Temel Stage’ler ve Kullanım Alanları

$match – Filtreleme

$match pipeline’ın en sık kullanılan aşamasıdır. Normal find() gibi çalışır ama pipeline içinde kullanılır. Önemli bir kural: $match‘i pipeline’ın mümkün olduğunca başına koyun, çünkü bu sayede işlenecek doküman sayısını azaltırsınız ve index’lerden yararlanırsınız.

# Geçen ay içinde, 1000 TL üzeri, iptal edilmemiş siparişler
db.orders.aggregate([
  {
    $match: {
      createdAt: {
        $gte: ISODate("2024-11-01T00:00:00Z"),
        $lt: ISODate("2024-12-01T00:00:00Z")
      },
      amount: { $gt: 1000 },
      status: { $ne: "cancelled" }
    }
  }
])

$group – Gruplama ve Agregasyon

$group stage’i SQL’deki GROUP BY‘a karşılık gelir. _id alanı gruplama anahtarını belirler, null verirseniz tüm koleksiyon üzerinde agregasyon yapılır.

# Kategori bazında satış istatistikleri
db.products.aggregate([
  {
    $group: {
      _id: "$category",
      totalSales: { $sum: "$soldCount" },
      avgPrice: { $avg: "$price" },
      maxPrice: { $max: "$price" },
      minPrice: { $min: "$price" },
      productCount: { $count: {} }
    }
  },
  { $sort: { totalSales: -1 } }
])

Yaygın $group operatörleri:

  • $sum: Değerleri toplar
  • $avg: Ortalama hesaplar
  • $min / $max: Minimum veya maksimum değeri bulur
  • $push: Değerleri bir diziye ekler
  • $addToSet: Tekrarsız değerleri bir diziye ekler
  • $first / $last: Grubun ilk veya son değerini alır
  • $count: Doküman sayısını verir

$project – Alan Seçimi ve Dönüşüm

$project ile çıktıda hangi alanların görüneceğini ve nasıl dönüştürüleceğini belirlersiniz. SQL’deki SELECT kısmına benzer.

# Kullanıcı listesi: sadece isim, email ve üyelik süresi
db.users.aggregate([
  {
    $project: {
      _id: 0,
      fullName: { $concat: ["$firstName", " ", "$lastName"] },
      email: 1,
      membershipDays: {
        $dateDiff: {
          startDate: "$createdAt",
          endDate: "$$NOW",
          unit: "day"
        }
      },
      isVIP: {
        $cond: {
          if: { $gte: ["$totalPurchases", 50] },
          then: true,
          else: false
        }
      }
    }
  }
])

Gerçek Dünya Senaryosu 1: E-Ticaret Satış Raporu

Diyelim ki bir e-ticaret platformunun veritabanını yönetiyorsunuz. Aylık satış raporunu, kategori kırılımıyla, en çok satan ürünlerle ve ortalama sipariş değeriyle çıkarmanız gerekiyor. Bunu Aggregation Pipeline ile nasıl yaparsınız?

# Aylık satış raporu - kategori ve ürün bazlı
db.orders.aggregate([
  # Adım 1: Bu yılın verilerini filtrele
  {
    $match: {
      orderDate: { $gte: ISODate("2024-01-01T00:00:00Z") },
      status: { $in: ["completed", "delivered"] }
    }
  },
  # Adım 2: Her order içindeki items dizisini düzleştir
  { $unwind: "$items" },
  # Adım 3: Ay ve kategori bazında grupla
  {
    $group: {
      _id: {
        month: { $month: "$orderDate" },
        year: { $year: "$orderDate" },
        category: "$items.category"
      },
      totalRevenue: { $sum: { $multiply: ["$items.price", "$items.quantity"] } },
      totalOrders: { $sum: 1 },
      avgOrderValue: { $avg: { $multiply: ["$items.price", "$items.quantity"] } },
      topProducts: { $push: "$items.productName" }
    }
  },
  # Adım 4: Sonuçları düzenle
  {
    $project: {
      _id: 0,
      period: {
        $concat: [
          { $toString: "$_id.year" },
          "-",
          { $toString: "$_id.month" }
        ]
      },
      category: "$_id.category",
      totalRevenue: { $round: ["$totalRevenue", 2] },
      totalOrders: 1,
      avgOrderValue: { $round: ["$avgOrderValue", 2] }
    }
  },
  # Adım 5: Önce yıl-ay'a sonra gelire göre sırala
  { $sort: { period: 1, totalRevenue: -1 } }
])

Bu pipeline’da dikkat edilmesi gereken kritik nokta $unwind kullanımıdır. Siparişlerde items adında bir dizi var ve biz her ürünü ayrı ayrı analiz etmek istiyoruz. $unwind bu diziyi açarak her eleman için ayrı bir doküman oluşturur.

Gerçek Dünya Senaryosu 2: Uygulama Log Analizi

Sysadmin olarak en çok ihtiyaç duyduğunuz şeylerden biri log analizidir. MongoDB’de tutulan application log’larından anlamlı metrikler çıkaralım:

# Son 24 saatte endpoint bazlı hata oranı ve ortalama yanıt süresi
db.appLogs.aggregate([
  {
    $match: {
      timestamp: {
        $gte: new Date(new Date() - 24 * 60 * 60 * 1000)
      }
    }
  },
  {
    $group: {
      _id: {
        endpoint: "$endpoint",
        method: "$httpMethod"
      },
      totalRequests: { $sum: 1 },
      errorCount: {
        $sum: {
          $cond: [{ $gte: ["$statusCode", 500] }, 1, 0]
        }
      },
      avgResponseTime: { $avg: "$responseTimeMs" },
      p95ResponseTime: { $percentile: { input: "$responseTimeMs", p: [0.95], method: "approximate" } },
      uniqueUsers: { $addToSet: "$userId" }
    }
  },
  {
    $addFields: {
      errorRate: {
        $multiply: [
          { $divide: ["$errorCount", "$totalRequests"] },
          100
        ]
      },
      uniqueUserCount: { $size: "$uniqueUsers" }
    }
  },
  {
    $project: {
      uniqueUsers: 0
    }
  },
  {
    $match: {
      totalRequests: { $gt: 10 }
    }
  },
  { $sort: { errorRate: -1 } },
  { $limit: 20 }
])

Bu sorguda $addFields stage’ini dikkat edin. Hata oranını hesaplamak için gruplama sonucunda yeni bir alan ekliyoruz. Ayrıca pipeline içinde ikinci bir $match kullandık, az istek alan endpoint’leri eleyerek istatistiksel anlamsız sonuçları temizledik.

$lookup ile İlişkisel Sorgular

MongoDB NoSQL olmasına rağmen $lookup ile koleksiyonlar arası join yapabilirsiniz. Performans açısından dikkatli kullanmak gerekse de doğru senaryolarda çok işe yarar.

# Kullanıcı bilgileriyle birlikte son 30 gün sipariş özeti
db.orders.aggregate([
  {
    $match: {
      orderDate: { $gte: new Date(new Date() - 30 * 24 * 60 * 60 * 1000) }
    }
  },
  {
    $group: {
      _id: "$customerId",
      orderCount: { $sum: 1 },
      totalAmount: { $sum: "$totalPrice" },
      lastOrderDate: { $max: "$orderDate" }
    }
  },
  {
    $lookup: {
      from: "users",
      localField: "_id",
      foreignField: "_id",
      as: "customerInfo",
      pipeline: [
        { $project: { name: 1, email: 1, city: 1, memberTier: 1 } }
      ]
    }
  },
  {
    $unwind: {
      path: "$customerInfo",
      preserveNullAndEmptyArrays: false
    }
  },
  {
    $project: {
      _id: 0,
      customerId: "$_id",
      customerName: "$customerInfo.name",
      email: "$customerInfo.email",
      city: "$customerInfo.city",
      memberTier: "$customerInfo.memberTier",
      orderCount: 1,
      totalAmount: { $round: ["$totalAmount", 2] },
      lastOrderDate: 1
    }
  },
  { $sort: { totalAmount: -1 } }
])

$lookup içinde pipeline parametresi kullandığımıza dikkat edin. Bu sayede join yapılacak koleksiyondan sadece ihtiyacımız olan alanları çekiyoruz, gereksiz veri taşımıyoruz. Bu özellik MongoDB 3.6 ile geldi ve performans açısından çok kritik.

$facet ile Çok Boyutlu Analiz

$facet stage’i tek bir pipeline içinde paralel agregasyonlar yapmanıza olanak tanır. Özellikle arama ve filtreleme sonuçlarıyla birlikte özet istatistikler döndürmek istediğinizde çok kullanışlıdır.

# Ürün arama: sonuçlar + filtre seçenekleri + istatistikler
db.products.aggregate([
  {
    $match: {
      $text: { $search: "laptop" },
      inStock: true
    }
  },
  {
    $facet: {
      # Sayfalı sonuçlar
      results: [
        { $sort: { score: { $meta: "textScore" }, rating: -1 } },
        { $skip: 0 },
        { $limit: 20 },
        { $project: { name: 1, price: 1, brand: 1, rating: 1, image: 1 } }
      ],
      # Toplam sayı
      totalCount: [
        { $count: "total" }
      ],
      # Marka filtreleri
      brandFilters: [
        { $group: { _id: "$brand", count: { $sum: 1 } } },
        { $sort: { count: -1 } },
        { $limit: 10 }
      ],
      # Fiyat aralığı
      priceStats: [
        {
          $group: {
            _id: null,
            minPrice: { $min: "$price" },
            maxPrice: { $max: "$price" },
            avgPrice: { $avg: "$price" }
          }
        }
      ],
      # Puan dağılımı
      ratingDistribution: [
        {
          $bucket: {
            groupBy: "$rating",
            boundaries: [0, 2, 3, 4, 4.5, 5],
            default: "other",
            output: { count: { $sum: 1 } }
          }
        }
      ]
    }
  }
])

Bu tek sorgu ile hem arama sonuçlarını hem de frontend’in ihtiyaç duyduğu tüm filtre ve istatistik verilerini tek seferde çekiyorsunuz. Birden fazla sorgu yerine bir sorgu, hem network trafiği hem de uygulama karmaşıklığı açısından büyük avantaj.

Pipeline Optimizasyonu: Performansı Artırma Teknikleri

Aggregation Pipeline güçlü ama dikkatsiz kullanılırsa yavaş olabilir. Özellikle büyük koleksiyonlarda bunu çok net hissedersiniz.

Index kullanımı: Pipeline’ın başındaki $match ve $sort stage’leri index kullanabilir. Bunu doğrulamak için explain() kullanın:

# Pipeline'ın nasıl çalıştığını analiz et
db.orders.explain("executionStats").aggregate([
  { $match: { status: "completed", orderDate: { $gte: ISODate("2024-01-01") } } },
  { $group: { _id: "$customerId", total: { $sum: "$amount" } } }
])

explain() çıktısında IXSCAN görüyorsanız index kullanılıyor demektir, COLLSCAN görüyorsanız tüm koleksiyon taranıyor ve index eklemeniz gerekiyor.

allowDiskUse: Büyük veri setlerinde gruplama işlemleri bellekte yapılamayabilir. MongoDB varsayılan olarak 100MB bellek sınırı uygular:

# Disk kullanımına izin vererek büyük pipeline çalıştır
db.bigCollection.aggregate(
  [
    { $group: { _id: "$category", total: { $sum: "$value" } } },
    { $sort: { total: -1 } }
  ],
  { allowDiskUse: true }
)

$match erken, $project geç: $match‘i her zaman önce koyun. $project ile gereksiz alanları erken eleyebilirsiniz ama $match‘ten sonra yapın ki index’ten yararlanın. Pipeline tasarımında temel kural: önce veriyi küçült, sonra dönüştür.

$lookup’ı minimize edin: Mümkünse $lookup öncesinde $match ile veri setini küçültün. $lookup içindeki pipeline parametresiyle yalnızca ihtiyacınız olan alanları çekin.

Gerçek Dünya Senaryosu 3: Cohort Analizi

SaaS ürünlerinde cohort analizi kritiktir. Aynı dönemde kayıt olan kullanıcıların ilerleyen aylarda ne kadarının aktif kaldığını ölçmek için Aggregation Pipeline kullanabilirsiniz:

# Kayıt ayına göre kullanıcı retention analizi
db.userActivity.aggregate([
  # Kullanıcı bazlı aktivite özetini çıkar
  {
    $group: {
      _id: "$userId",
      firstActivityMonth: {
        $min: {
          $dateToString: { format: "%Y-%m", date: "$activityDate" }
        }
      },
      activeMonths: {
        $addToSet: {
          $dateToString: { format: "%Y-%m", date: "$activityDate" }
        }
      }
    }
  },
  # Cohort boyutunu hesapla
  {
    $group: {
      _id: "$firstActivityMonth",
      cohortSize: { $sum: 1 },
      users: {
        $push: {
          userId: "$_id",
          activeMonths: "$activeMonths"
        }
      }
    }
  },
  {
    $project: {
      _id: 0,
      cohortMonth: "$_id",
      cohortSize: 1,
      month1Retention: {
        $divide: [
          {
            $size: {
              $filter: {
                input: "$users",
                cond: {
                  $in: [
                    { $concat: ["$_id", "-next"] },
                    "$$this.activeMonths"
                  ]
                }
              }
            }
          },
          "$cohortSize"
        ]
      }
    }
  },
  { $sort: { cohortMonth: 1 } }
])

$bucket ve $bucketAuto ile Histogram

Veri dağılımını anlamak için histogram oluşturmak sık ihtiyaç duyulan bir işlemdir:

# Sipariş tutarı dağılımı - otomatik bucket'lar
db.orders.aggregate([
  { $match: { status: "completed" } },
  {
    $bucketAuto: {
      groupBy: "$totalAmount",
      buckets: 10,
      output: {
        count: { $sum: 1 },
        avgAmount: { $avg: "$totalAmount" },
        totalRevenue: { $sum: "$totalAmount" }
      }
    }
  },
  {
    $project: {
      range: {
        $concat: [
          { $toString: { $round: ["$_id.min", 0] } },
          " - ",
          { $toString: { $round: ["$_id.max", 0] } },
          " TL"
        ]
      },
      count: 1,
      avgAmount: { $round: ["$avgAmount", 2] },
      totalRevenue: { $round: ["$totalRevenue", 2] }
    }
  }
])

$bucketAuto veriyi otomatik olarak eşit dağılımlı gruplara böler. Manuel olarak sınır belirlemek istemiyorsanız bu çok pratik bir çözümdür.

Pipeline Sonuçlarını Dışa Aktarma

Büyük pipeline sonuçlarını başka bir koleksiyona yazmak için $out veya $merge kullanabilirsiniz. Özellikle periyodik raporlama için çok kullanışlıdır:

# Günlük satış özeti koleksiyonunu güncelle
db.orders.aggregate([
  {
    $match: {
      orderDate: {
        $gte: ISODate("2024-12-01T00:00:00Z"),
        $lt: ISODate("2024-12-02T00:00:00Z")
      }
    }
  },
  {
    $group: {
      _id: {
        date: { $dateToString: { format: "%Y-%m-%d", date: "$orderDate" } },
        category: "$category"
      },
      revenue: { $sum: "$totalAmount" },
      orderCount: { $sum: 1 }
    }
  },
  {
    $merge: {
      into: "dailySalesSummary",
      on: ["_id"],
      whenMatched: "replace",
      whenNotMatched: "insert"
    }
  }
])

$merge, $out‘un aksine koleksiyonu tamamen silip yeniden yazmak yerine mevcut verileri günceller. Production ortamında raporlama koleksiyonlarını güncellerken bu farkı göz önünde bulundurun.

Sonuç

Aggregation Pipeline, MongoDB’nin gerçek gücünü ortaya koyan özelliktir. Başta karmaşık görünen bu yapı, pratikte çok katmanlı veri işleme ihtiyaçlarınızı SQL’e gerek duymadan çözmenizi sağlar.

Önemli noktalara tekrar değinelim: $match‘i her zaman pipeline’ın başına koyun ve index’lerinizi buna göre tasarlayın. $lookup kullanırken veri setini önceden küçültün. Büyük pipeline’larda explain() ile performans analizi yapın ve gerekirse allowDiskUse: true seçeneğini kullanın. $facet ile tek sorguda birden fazla analiz yaparak veritabanına gidiş-dönüş sayısını azaltın.

Gerçek dünyada işler her zaman düzgün yürümez, koleksiyonlar büyür, sorgular yavaşlar. Bu yüzden pipeline’larınızı sadece çalışır hale getirmekle kalmayıp, explain() ile düzenli olarak gözden geçirme alışkanlığı edinin. Index stratejinizi Aggregation Pipeline kullanım kalıplarınıza göre şekillendirin. Böylece hem geliştirici hem sysadmin perspektifinden MongoDB’yi en verimli şekilde kullanmış olursunuz.

Yorum yapın