Continuous Query ile Veri Özetleme: InfluxDB

Zaman serisi verilerini tutmak kolaydır ama zamanla bu veriler devasa boyutlara ulaşır. Bir üretim ortamında her saniye gelen metrikler, haftalar sonra terabaytlarca veri anlamına gelebilir. İşte burada InfluxDB’nin Continuous Query (CQ) özelliği devreye girer. CQ, periyodik olarak çalışan ve ham veriyi özetleyerek daha küçük, yönetilebilir veri noktalarına indirgeyen otomatik sorgulardır. Bu yazıda CQ’nun ne işe yaradığını, nasıl kurulacağını ve gerçek dünya senaryolarında nasıl kullanılacağını ele alacağız.

Continuous Query Nedir ve Neden Gereklidir?

Düşün: Bir sunucu izleme sistemi kuruyorsun. Her 10 saniyede bir CPU, bellek, disk I/O ve ağ trafiği metrikleri geliyor. 50 sunucu için bu, dakikada 6 veri noktası anlamına gelir, yani saatte 360, günde 8640 kayıt. 50 sunucu için günde 432.000 satır. Bir yıl sonra 157 milyonun üzerinde satır birikir.

Ama gerçekte bunu düşün: Son 1 saatin verisine bakarken 10 saniyelik granülasyon mantıklı. Ama 6 ay önceki Pazartesi sabahı saat 3’teki CPU kullanımına bakarken 10 saniyelik detaya ihtiyacın var mı? Büyük ihtimalle saatlik ortalama yeterli.

Continuous Query tam burada devreye girer:

  • Ham veri: 10 saniyelik granülasyon, son 7 gün tutulur
  • 1 saatlik özet: Saatlik ortalama/min/max, son 1 yıl tutulur
  • Günlük özet: Günlük istatistikler, sonsuza kadar tutulabilir

Bu yaklaşıma downsampling denir ve InfluxDB’de CQ + Retention Policy kombinasyonuyla uygulanır.

Retention Policy ile CQ İlişkisi

CQ’yu anlamadan önce Retention Policy (RP) kavramını netleştirmek lazım. RP, verinin ne kadar süre tutulacağını belirler.

# Mevcut retention policy'leri listele
influx -execute "SHOW RETENTION POLICIES ON mydb"

# Yeni bir retention policy oluştur
influx -execute "CREATE RETENTION POLICY 'one_week' ON 'mydb' DURATION 7d REPLICATION 1 DEFAULT"

# Uzun süreli özet verisi için RP
influx -execute "CREATE RETENTION POLICY 'one_year' ON 'mydb' DURATION 52w REPLICATION 1"

# Sonsuz saklama için
influx -execute "CREATE RETENTION POLICY 'infinite' ON 'mydb' DURATION INF REPLICATION 1"

Tipik bir yapıda şunu kurarsın: Varsayılan RP ham veriyi 7 gün tutar, one_year adlı RP saatlik özetleri 1 yıl tutar, infinite adlı RP ise günlük özetleri sonsuza kadar saklar. CQ, ham veriden saatlik özete, saatlik özetten günlük özete veriyi otomatik olarak aktarır.

İlk Continuous Query: Temel Sözdizimi

CQ sözdizimi standart InfluxQL’e oldukça benzer ama birkaç kritik farkı var.

-- Temel CQ yapısı
CREATE CONTINUOUS QUERY "cq_cpu_hourly"
ON "mydb"
BEGIN
  SELECT mean("value") AS "mean_value",
         max("value") AS "max_value",
         min("value") AS "min_value"
  INTO "one_year"."cpu_usage_hourly"
  FROM "cpu_usage"
  GROUP BY time(1h), *
END

Bu sorgu her saat otomatik çalışır ve cpu_usage measurement’ından saatlik ortalama, maksimum ve minimum değerleri alarak one_year retention policy altındaki cpu_usage_hourly measurement’ına yazar. GROUP BY time(1h), * kısmındaki yıldız, tüm tag’leri koruyarak gruplama yapar. Yani sunucu adı, datacenter, servis gibi tag’lerin tamamı özetlenmiş veriye aktarılır.

Şimdi bu CQ’nun çalıştığını doğrulayalım:

# Tüm CQ'ları listele
influx -execute "SHOW CONTINUOUS QUERIES"

# Belirli bir veritabanındaki CQ'lar
influx -execute "SHOW CONTINUOUS QUERIES" -database "mydb"

Gerçek Dünya Senaryosu 1: Sunucu İzleme Sistemi

Bir e-ticaret şirketinde 80 uygulama sunucusu yönetiyorsun. Telegraf ile her 10 saniyede bir metrik topluyorsun. İşte bu senaryo için eksiksiz bir CQ yapısı:

# Önce veritabanı ve RP'leri kur
influx << 'EOF'
CREATE DATABASE monitoring

CREATE RETENTION POLICY "raw" ON "monitoring" DURATION 7d REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "hourly" ON "monitoring" DURATION 365d REPLICATION 1
CREATE RETENTION POLICY "daily" ON "monitoring" DURATION INF REPLICATION 1

-- CPU için 5 dakikalık özet
CREATE CONTINUOUS QUERY "cq_cpu_5min"
ON "monitoring"
BEGIN
  SELECT mean("usage_idle") AS "usage_idle",
         mean("usage_user") AS "usage_user",
         mean("usage_system") AS "usage_system",
         max("usage_user") AS "max_usage_user"
  INTO "hourly"."cpu_5min"
  FROM "raw"."cpu"
  GROUP BY time(5m), *
END

-- Bellek için 5 dakikalık özet
CREATE CONTINUOUS QUERY "cq_mem_5min"
ON "monitoring"
BEGIN
  SELECT mean("used_percent") AS "used_percent",
         max("used_percent") AS "max_used_percent",
         mean("available") AS "available"
  INTO "hourly"."mem_5min"
  FROM "raw"."mem"
  GROUP BY time(5m), *
END

EOF

Dikkat etmen gereken bir nokta: FROM kısmında "raw"."cpu" şeklinde retention policy adını da belirtmek en güvenli yaklaşımdır. Böylece hangi RP’den okuyacağını açıkça söylemiş olursun.

RESAMPLE ile Zaman Aralığı Kontrolü

CQ’nun en güçlü özelliklerinden biri RESAMPLE claus’udur. Bu, CQ’nun kaç sıklıkta çalışacağını ve geçmişe dönük hangi zaman aralığını kapsayacağını belirler.

CREATE CONTINUOUS QUERY "cq_network_hourly"
ON "monitoring"
RESAMPLE EVERY 30m FOR 2h
BEGIN
  SELECT sum("bytes_recv") AS "bytes_recv",
         sum("bytes_sent") AS "bytes_sent",
         mean("packets_recv") AS "packets_recv"
  INTO "hourly"."network_hourly"
  FROM "raw"."net"
  GROUP BY time(1h), *
END

Burada RESAMPLE EVERY 30m FOR 2h ne anlama geliyor:

  • EVERY 30m: Bu CQ her 30 dakikada bir çalışır (varsayılan olarak GROUP BY süresi kadardır)
  • FOR 2h: Her çalıştığında son 2 saatlik veriyi yeniden hesaplar

Neden FOR parametresi önemli? Telegraf veya başka bir agent geç veri gönderebilir. Ağ kesintisi, restart gibi durumlar verinin biraz gecikmeli gelmesine yol açabilir. FOR 2h diyerek son 2 saatlik pencereyi her seferinde yeniden hesaplarsın, böylece geç gelen verileri kaçırmazsın.

Gerçek Dünya Senaryosu 2: IoT Sensör Verisi Özetleme

Bir fabrikada 200 sıcaklık sensörü var. Her sensör saniyede bir veri gönderiyor. Bu ciddi bir veri yükü. Günde 200 * 86400 = yaklaşık 17.3 milyon satır.

influx -database "factory" << 'EOF'

-- Ham veri sadece 24 saat tutuluyor
CREATE RETENTION POLICY "realtime" ON "factory" DURATION 24h REPLICATION 1 DEFAULT
-- 1 dakikalık özetler 30 gün
CREATE RETENTION POLICY "minute" ON "factory" DURATION 30d REPLICATION 1
-- Saatlik özetler 2 yıl
CREATE RETENTION POLICY "hourly" ON "factory" DURATION 730d REPLICATION 1
-- Günlük özetler sonsuza kadar
CREATE RETENTION POLICY "daily" ON "factory" DURATION INF REPLICATION 1

-- Saniyeden dakikaya downsample
CREATE CONTINUOUS QUERY "cq_temp_1min"
ON "factory"
RESAMPLE EVERY 1m FOR 5m
BEGIN
  SELECT mean("temperature") AS "mean_temp",
         max("temperature") AS "max_temp",
         min("temperature") AS "min_temp",
         stddev("temperature") AS "stddev_temp",
         count("temperature") AS "sample_count"
  INTO "minute"."temperature_1min"
  FROM "realtime"."sensor_data"
  GROUP BY time(1m), *
END

-- Dakikadan saate downsample
CREATE CONTINUOUS QUERY "cq_temp_1h"
ON "factory"
RESAMPLE EVERY 30m FOR 2h
BEGIN
  SELECT mean("mean_temp") AS "mean_temp",
         max("max_temp") AS "max_temp",
         min("min_temp") AS "min_temp",
         mean("stddev_temp") AS "avg_stddev",
         sum("sample_count") AS "total_samples"
  INTO "hourly"."temperature_1h"
  FROM "minute"."temperature_1min"
  GROUP BY time(1h), *
END

-- Saatlikten günlüğe downsample
CREATE CONTINUOUS QUERY "cq_temp_1d"
ON "factory"
RESAMPLE EVERY 6h FOR 2d
BEGIN
  SELECT mean("mean_temp") AS "mean_temp",
         max("max_temp") AS "max_temp",
         min("min_temp") AS "min_temp",
         sum("total_samples") AS "total_samples"
  INTO "daily"."temperature_1d"
  FROM "hourly"."temperature_1h"
  GROUP BY time(1d), *
END

EOF

Bu kademeli downsample yaklaşımı çok önemli. Doğrudan saniyeden güne downsample yapmak yerine ara adımlar kullanmak hem daha doğru istatistikler üretir hem de CQ’nun işlem yükünü dengeler.

CQ ile Anomali Tespiti için Ön Hesaplama

CQ sadece basit ortalama almak için değil, daha karmaşık hesaplamalar için de kullanılabilir. Örneğin bir API gateway’in p95 latency değerlerini önceden hesaplayabilirsin:

CREATE CONTINUOUS QUERY "cq_api_latency_stats"
ON "production"
RESAMPLE EVERY 5m FOR 15m
BEGIN
  SELECT mean("response_time") AS "avg_latency",
         percentile("response_time", 50) AS "p50_latency",
         percentile("response_time", 95) AS "p95_latency",
         percentile("response_time", 99) AS "p99_latency",
         count("response_time") AS "request_count",
         sum("error_count") AS "total_errors"
  INTO "metrics"."api_latency_5min"
  FROM "raw_metrics"."api_requests"
  GROUP BY time(5m), "endpoint", "region", "service"
END

Burada GROUP BY içinde yıldız yerine belirli tag’leri listeledik. Bu, ihtiyaç duyulmayan tag’lerin karda işlemesi önlemek ve daha temiz bir veri yapısı elde etmek için tercih edilebilir.

Backfill: Geçmiş Veriyi Doldurma

Yeni bir CQ oluşturduğunda bu sadece gelecekteki verileri özetler, geçmiş veriler boş kalır. Geçmiş veriyi doldurmak için manuel backfill yapman gerekir:

# Geçmiş veri için manuel backfill
# CQ'nun yapacağı işi belirli bir zaman aralığı için elle çalıştır

influx -database "monitoring" << 'EOF'
SELECT mean("usage_user") AS "usage_user",
       max("usage_user") AS "max_usage_user",
       min("usage_user") AS "min_usage_user"
INTO "hourly"."cpu_hourly"
FROM "raw"."cpu"
WHERE time >= '2024-01-01T00:00:00Z' AND time < '2024-02-01T00:00:00Z'
GROUP BY time(1h), *
EOF

Uzun geçmiş periyotları için bunu script haline getirmek mantıklıdır:

#!/bin/bash
# backfill_cq.sh - Aylık backfill script'i

INFLUX_HOST="localhost"
INFLUX_PORT="8086"
DB="monitoring"
START_DATE="2024-01-01"
END_DATE="2024-06-01"

current_date="$START_DATE"

while [[ "$current_date" < "$END_DATE" ]]; do
    next_date=$(date -d "$current_date + 1 month" +%Y-%m-01)
    
    echo "Backfill yapılıyor: $current_date - $next_date"
    
    influx -host "$INFLUX_HOST" -port "$INFLUX_PORT" 
           -database "$DB" 
           -execute "
        SELECT mean("usage_user") AS "usage_user",
               max("usage_user") AS "max_usage_user"
        INTO "hourly"."cpu_hourly"
        FROM "raw"."cpu"
        WHERE time >= '${current_date}T00:00:00Z' 
          AND time < '${next_date}T00:00:00Z'
        GROUP BY time(1h), *
    "
    
    current_date="$next_date"
    sleep 2  # InfluxDB'ye nefes aldır
done

echo "Backfill tamamlandi!"

CQ Yönetimi ve Sorun Giderme

CQ’ların düzgün çalışıp çalışmadığını izlemek önemlidir. InfluxDB kendi iç metriklerini _internal veritabanında tutar:

# CQ execution istatistikleri
influx -database "_internal" -execute "
SELECT * FROM "continuousQuery" 
WHERE time > now() - 1h
ORDER BY time DESC
LIMIT 20
"

# CQ'nun ne zaman en son çalıştığını kontrol et
influx -database "_internal" -execute "
SELECT last("queryDurationNs"), last("pointsWrittenOK")
FROM "continuousQuery"
WHERE "db" = 'monitoring'
GROUP BY "name"
"

Bir CQ’yu silmek veya güncellemek gerektiğinde:

# CQ'yu sil
influx -database "monitoring" -execute "DROP CONTINUOUS QUERY cq_cpu_hourly ON monitoring"

# CQ'yu güncellemek için önce sil, sonra yeniden oluştur
# InfluxDB'de ALTER CONTINUOUS QUERY yoktur

# Mevcut CQ'nun tanımını gör
influx -execute "SHOW CONTINUOUS QUERIES"

CQ ile ilgili yaygın sorunlar ve çözümleri:

  • Veri yazılmıyor: INTO kısmındaki retention policy ve measurement adlarını kontrol et. RP’nin doğru oluşturulduğunu doğrula.
  • Gecikmeli veri kaçırılıyor: RESAMPLE FOR değerini artır, geç gelen verileri kapsayacak kadar geniş bir pencere belirle.
  • CQ çok fazla kaynak kullanıyor: RESAMPLE EVERY değerini artırarak daha seyrek çalıştır, FOR penceresini de buna göre ayarla.
  • Tag’ler özetlenmiş veriye geçmiyor: GROUP BY time(1h), * ifadesindeki yıldızı unutma. Yıldız olmadan tag’ler kaybolur.

InfluxDB 2.x ve Flux ile CQ Alternatifi

InfluxDB 2.x versiyonunda CQ resmi olarak kaldırıldı ve yerini Flux dili ile yazılan Task’lar aldı. Eğer InfluxDB 2.x kullanıyorsan aşağıdaki yaklaşımı benimsemelisin:

# InfluxDB 2.x'te task oluşturma (Flux dili)
influx task create --file cpu_downsample.flux

# cpu_downsample.flux dosyası içeriği:
cat > cpu_downsample.flux << 'FLUX'
option task = {
  name: "CPU Saatlik Downsample",
  every: 1h,
  offset: 5m
}

from(bucket: "monitoring/raw")
  |> range(start: -task.every)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> filter(fn: (r) => r._field == "usage_user" or r._field == "usage_idle")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> to(bucket: "monitoring/hourly", org: "myorg")
FLUX

InfluxDB 1.x kullanmaya devam ediyorsan CQ hala tam anlamıyla desteklenmekte ve production ortamlarda güvenle kullanılabilmektedir.

Pratik Tavsiyeler ve En İyi Uygulamalar

Gerçek ortamlarda CQ kullanırken öğrendiğim birkaç önemli nokta:

  • Naming convention belirle: CQ isimlerinde cq_[measurement]_[interval] formatını kullan. Örneğin cq_cpu_5min, cq_network_1h. Aylar sonra ne yaptığını anlaman çok daha kolay olur.
  • kullanımına dikkat et: GROUP BY time(1h), yıldızı kardinaliteyi artırabilir. Çok sayıda unique tag kombinasyonun varsa belirli tag’leri listele.
  • Kademeli downsample uygula: Ham veriden direkt günlüğe downsample etme. Saniye to dakika to saat to gün şeklinde kademeli git. İstatistiksel doğruluk açısından bu çok önemli.
  • RP ile CQ’yu birlikte planla: CQ oluştururken retention policy planını önceden yap. Sonradan değiştirmek veri kaybına yol açabilir.
  • İlk kurulumda backfill yapma: Yeni bir sisteme CQ kuruyorsan geçmişe dönük backfill script’ini hazırla ve çalıştır. Yoksa dashboard’larında büyük boşluklar görürsün.
  • Test ortamında dene: Production’a almadan önce küçük bir veri setiyle CQ’nun beklenen sonuçları ürettiğini doğrula. Özellikle RESAMPLE FOR değerinin geç gelen veriler için yeterli olduğunu test et.
  • Disk tasarrufunu ölç: CQ kurulumundan önce ve sonra disk kullanımını karşılaştır. Genellikle %70-90 arasında tasarruf elde edildiğini görürsün.

Sonuç

Continuous Query, InfluxDB’de veri yönetiminin bel kemiğidir. Ham veriyi gereksiz yere sonsuza kadar saklamak hem maliyetli hem de pratik değil. CQ ile ham veriyi kısa süre tutup, özetlenmiş verileri uzun vadeli saklayarak hem disk kullanımını optimize edersin hem de sorgu performansını ciddi ölçüde artırırsın.

Önemli noktalara bakalım: Retention Policy ve CQ birlikte planlanmalı, kademeli downsample tercih edilmeli, RESAMPLE parametreleri geç gelen verileri hesaba katacak şekilde ayarlanmalı ve yeni bir CQ kurulumunda backfill ihmal edilmemeli.

Özellikle IoT, izleme sistemleri ve log analizi gibi yüksek frekanslı veri üreten sistemlerde CQ kullanmadan InfluxDB yönetmek hem veritabanını yorar hem seni. Bu yapıyı bir kere doğru kurduğunda, aylarca yıllarca rahatça çalışır. Eğer InfluxDB 2.x veya InfluxDB Cloud kullanıyorsan Task API’ye geçişi de planlamanı öneririm çünkü Flux dili çok daha esnek ve ifade gücü yüksek bir yapı sunuyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir