Flux ile InfluxDB Sorgulama Rehberi

Eğer daha önce InfluxDB 1.x ile SQL benzeri InfluxQL kullandıysanız ve şimdi InfluxDB 2.x’e geçtiyseniz, karşınıza çıkan Flux dili ilk başta biraz yabancı gelebilir. Ancak birkaç saat harcadıktan sonra Flux’un ne kadar güçlü ve esnek bir sorgu dili olduğunu anlıyorsunuz. Bu yazıda Flux ile çalışmanın temellerinden başlayarak, gerçek dünya senaryolarına kadar kapsamlı bir rehber sunacağım.

Flux Nedir ve Neden Önemlidir?

Flux, InfluxData tarafından geliştirilen, veri sorgulama ve işleme için tasarlanmış fonksiyonel bir skript dilidir. InfluxQL’den temel farkı şudur: Flux sadece bir sorgu dili değil, aynı zamanda veri transformasyonu, matematiksel hesaplamalar ve hatta uyarı (alerting) mantığı yazabileceğiniz bir dildir.

Flux’un öne çıkan özellikleri şunlardır:

  • Pipeline tabanlı yapı: Veriler |> operatörü ile adım adım işlenir
  • Fonksiyonel yaklaşım: Her işlem bir fonksiyon çağrısıdır
  • Zengin built-in fonksiyonlar: 200’den fazla hazır fonksiyon
  • Çoklu veri kaynağı desteği: InfluxDB dışında CSV, SQL, HTTP kaynaklarından da veri çekebilirsiniz
  • Veri birleştirme (join): Farklı measurement’lardan gelen verileri birleştirebilirsiniz

Temel Kavramlar

Bucket ve Measurement

InfluxDB 2.x’te veriler bucket’larda saklanır. Her bucket içinde measurement’lar, her measurement içinde field ve tag’lar bulunur. Flux sorgularında bu hiyerarşiyi anlamak kritiktir.

  • Bucket: Veri depolama birimi (InfluxDB 1.x’teki database’e benzer)
  • Measurement: Tablo benzeri yapı (InfluxDB 1.x’tekiyle aynı)
  • Field: Asıl metrik değerleri (sayısal, string, boolean olabilir)
  • Tag: İndekslenmiş metadata (her zaman string)

Pipeline Operatörü

Flux’un en karakteristik özelliği olan |> operatörü, bir fonksiyonun çıktısını bir sonraki fonksiyona aktarır. Unix pipe’larına çok benzer bir mantıkla çalışır.

from(bucket: "monitoring")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> filter(fn: (r) => r._field == "usage_idle")
  |> mean()

Yukarıdaki sorguyu okuyalım: “monitoring bucket’ından son 1 saatlik veriyi al, cpu measurement’ını filtrele, usage_idle field’ını al ve ortalamasını hesapla.” Bu kadar okunabilir bir sözdizimiyle çalışmak gerçekten keyiflidir.

İlk Sorgularınız

Temel Veri Çekme

Her Flux sorgusu from() ile başlar ve mutlaka range() fonksiyonu kullanılmalıdır. InfluxDB zaman serisi veritabanı olduğu için zaman aralığı belirtmek zorunludur.

// Son 24 saatin tüm CPU verilerini getir
from(bucket: "servers")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "cpu")

Burada start: -24h ifadesi “şimdiden 24 saat öncesi” anlamına gelir. Başlangıç ve bitiş zamanını ayrı ayrı da belirtebilirsiniz:

// Belirli bir tarih aralığı
from(bucket: "servers")
  |> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-07T23:59:59Z)
  |> filter(fn: (r) => r._measurement == "cpu")

Filter Fonksiyonu ile Veri Seçimi

filter() fonksiyonu, satır bazında filtreleme yapar. fn parametresi bir lambda fonksiyon alır ve her satır için true veya false döner.

// Belirli bir sunucunun belirli bir field'ını filtrele
from(bucket: "servers")
  |> range(start: -6h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_user" and
    r.host == "web-server-01"
  )

Birden fazla tag değerine göre filtreleme için or kullanabilirsiniz:

// Birden fazla sunucu için filtreleme
from(bucket: "servers")
  |> range(start: -6h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_user" and
    (r.host == "web-server-01" or r.host == "web-server-02" or r.host == "db-server-01")
  )

Aggregation ve Gruplama

Temel Aggregation Fonksiyonları

Flux’ta sık kullanılan aggregation fonksiyonları şunlardır:

  • mean(): Aritmetik ortalama
  • sum(): Toplam
  • count(): Satır sayısı
  • min(): En küçük değer
  • max(): En büyük değer
  • median(): Medyan değer
  • stddev(): Standart sapma
  • percentile(): Yüzdelik dilim hesaplama
// Son 1 saatlik CPU kullanımının ortalaması
from(bucket: "servers")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
  |> mean()

aggregateWindow ile Zaman Bazlı Örnekleme

Gerçek dünya senaryolarında en çok ihtiyaç duyduğunuz fonksiyonlardan biri aggregateWindow()dur. Grafana’da zaman serisi grafikleri oluştururken bu fonksiyon olmadan düzgün bir görüntü elde edemezsiniz.

// Her 5 dakikalık CPU ortalaması
from(bucket: "servers")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
  |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)

Burada createEmpty: false parametresi önemlidir. Eğer bir zaman penceresinde veri yoksa boş satır oluşturulmamasını sağlar. Production ortamında genellikle bunu false olarak bırakmak daha temiz sonuçlar verir.

group() ile Gruplama

// Her sunucu için ayrı ayrı CPU ortalaması
from(bucket: "servers")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
  |> group(columns: ["host"])
  |> mean()

Gerçek Dünya Senaryoları

Senaryo 1: Disk Doluluk Uyarısı

Bir sistemin disk kullanımını izliyorsunuz ve %85’in üzerindeki durumlarda uyarı almak istiyorsunuz. Telegraf ile topladığınız disk metriklerini sorgulayalım:

// Disk kullanımı %85'ten fazla olan sunucular
from(bucket: "infrastructure")
  |> range(start: -5m)
  |> filter(fn: (r) =>
    r._measurement == "disk" and
    r._field == "used_percent"
  )
  |> filter(fn: (r) => r._value > 85.0)
  |> last()
  |> map(fn: (r) => ({r with
    alert_message: "UYARI: " + r.host + " sunucusunda " + r.path + " dizini doluluk orani: " + string(v: r._value) + "%"
  }))

Bu sorguda map() fonksiyonunu kullandım. map(), her satır için yeni bir dönüşüm uygulamanızı sağlar ve burada özel bir uyarı mesajı oluşturuyoruz.

Senaryo 2: HTTP Response Süresi Analizi

Web uygulamanızın response sürelerini izliyorsunuz. 95. yüzdelik dilimin son 30 dakika içinde nasıl değiştiğini görmek istiyorsunuz:

// HTTP response süresi P95 analizi - her 1 dakikalık pencere
from(bucket: "application")
  |> range(start: -30m)
  |> filter(fn: (r) =>
    r._measurement == "http_response" and
    r._field == "response_time" and
    r.server == "api.example.com"
  )
  |> aggregateWindow(
    every: 1m,
    fn: (tables=<-, column) => tables |> quantile(q: 0.95, column: column),
    createEmpty: false
  )

Bu biraz daha gelişmiş bir kullanım. aggregateWindow içinde custom bir fonksiyon tanımlayarak 95. yüzdelik dilimi hesaplıyoruz.

Senaryo 3: Bellek Kullanımı Trend Analizi

Bir sunucunun bellek kullanımının son 1 haftadaki trendini analiz etmek için timedMovingAverage() fonksiyonunu kullanabiliriz:

// 1 haftalık bellek kullanımı hareketli ortalama (6 saatlik pencere)
from(bucket: "servers")
  |> range(start: -7d)
  |> filter(fn: (r) =>
    r._measurement == "mem" and
    r._field == "used_percent" and
    r.host == "db-master-01"
  )
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> timedMovingAverage(every: 1h, period: 6h)

Bu sorgu her saat için ortalama bellek kullanımını hesaplar, ardından 6 saatlik kayan ortalama uygular. Kısa vadeli dalgalanmaları filtreler ve genel trendi daha net gösterir.

Veri Dönüşümleri

pivot() Fonksiyonu

Bazen aynı measurement içindeki farklı field’ları yan yana görmek istersiniz. pivot() fonksiyonu bu işi yapar:

// CPU field'larını geniş tablo formatına çevir
from(bucket: "servers")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu" and r.host == "web-01")
  |> filter(fn: (r) =>
    r._field == "usage_user" or
    r._field == "usage_system" or
    r._field == "usage_idle"
  )
  |> pivot(
    rowKey: ["_time"],
    columnKey: ["_field"],
    valueColumn: "_value"
  )

Bu sorgudan sonra her satırda usage_user, usage_system ve usage_idle kolonları yan yana gelecek. Grafana tablolarında bu format çok işe yarar.

join() ile İki Farklı Measurement’ı Birleştirme

// CPU ve Bellek kullanımını birleştir
cpu_data = from(bucket: "servers")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
  |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
  |> rename(columns: {_value: "cpu_usage"})

mem_data = from(bucket: "servers")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
  |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
  |> rename(columns: {_value: "mem_usage"})

join(
  tables: {cpu: cpu_data, mem: mem_data},
  on: ["_time", "host"]
)

Bu tür sorgular özellikle korelasyon analizi için çok değerlidir. CPU spike’ları ile bellek kullanımı arasında bir ilişki var mı diye araştırırken bu pattern’ı sıkça kullanıyorum.

Değişkenler ve Kod Organizasyonu

Flux’ta değişken tanımlamak sorguları hem okunabilir hem de yeniden kullanılabilir hale getirir:

// Değişkenlerle temiz sorgu yazımı
bucket_name = "infrastructure"
time_range = -6h
target_host = "prod-db-01"
threshold = 80.0

yuksek_cpu = from(bucket: bucket_name)
  |> range(start: time_range)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_user" and
    r.host == target_host
  )
  |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
  |> filter(fn: (r) => r._value > threshold)

yuksek_cpu
  |> count()

Bu yaklaşım özellikle InfluxDB’nin task (scheduled query) özelliğini kullanırken çok işe yarar. Değişkenleri üstte tanımlayıp tek noktadan yönetebilirsiniz.

Performans İpuçları

Flux sorguları yazarken production ortamında performansı ciddi şekilde etkileyen bazı noktalar var. Bunları öğrenmesi biraz zaman alıyor ama sysadmin deneyimimden derlediğim ipuçları şunlar:

1. Filter’ları Erken Uygulayın

filter() fonksiyonlarını pipeline’ın başına taşımak, işlenecek veri miktarını erken aşamada azaltır. Özellikle _measurement ve _field filtrelerini her zaman önce uygulayın.

Kötü pratik:

from(bucket: "servers")
  |> range(start: -7d)
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")

İyi pratik:

from(bucket: "servers")
  |> range(start: -7d)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)

2. Range Sürelerini Makul Tutun

Gereksiz yere uzun zaman aralıkları sorgu süresini katlar. Grafana dashboardlarında v.timeRangeStart ve v.timeRangeStop template değişkenlerini kullanın:

from(bucket: "servers")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "cpu")

3. limit() ile Test Sorguları

Büyük veri setlerinde yeni bir sorgu yazarken önce limit() ile veri miktarını sınırlandırın:

from(bucket: "servers")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> limit(n: 100)

task() ile Zamanlanmış Sorgular

InfluxDB’nin en güçlü özelliklerinden biri, Flux sorgularını zamanlanmış görev olarak çalıştırabilmektir. Örneğin her saat başı CPU verilerini downsampling yaparak uzun dönem saklayalım:

option task = {
  name: "CPU Downsampling - Hourly",
  every: 1h,
  offset: 5m
}

from(bucket: "servers")
  |> range(start: -task.every)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> to(bucket: "servers_downsampled", org: "myorg")

Bu task her saat başından 5 dakika sonra çalışır (offset sayesinde), son 1 saatlik CPU verilerinin ortalamasını hesaplar ve servers_downsampled bucket’ına yazar. Bu pattern ile ham verileri 30 gün, downsampled verileri 1 yıl saklayan bir retention stratejisi kurabilirsiniz.

Hata Ayıklama

Flux sorgularını debug ederken yield() fonksiyonu çok işe yarar. Pipeline’ın herhangi bir noktasındaki veriyi incelemenizi sağlar:

from(bucket: "servers")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> yield(name: "ham_veri")  // Bu noktadaki veriyi göster
  |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
  |> yield(name: "islenmis_veri")  // Bu noktadaki veriyi göster

InfluxDB UI’daki Data Explorer’da her iki yield çıktısını da ayrı ayrı görebilirsiniz. Hangi aşamada ne kadar veri olduğunu ve dönüşümlerin doğru çalışıp çalışmadığını bu şekilde kontrol ederim.

InfluxDB CLI ile Flux Kullanımı

Komut satırından Flux sorguları çalıştırmak için influx query komutunu kullanabilirsiniz:

# Tek satır sorgu
influx query 'from(bucket:"servers") |> range(start:-1h) |> filter(fn:(r) => r._measurement == "cpu") |> mean()'

# Dosyadan sorgu çalıştırma
influx query --file /etc/influxdb/queries/cpu_check.flux

# Token ile uzak sunucuya bağlanarak sorgu
influx query 
  --host http://influxdb.example.com:8086 
  --token "your-api-token-here" 
  --org "myorg" 
  'from(bucket:"servers") |> range(start:-5m) |> filter(fn:(r) => r._measurement == "cpu") |> last()'

Script’lerinizde bu komutları kullanarak otomatik raporlar veya health check’ler oluşturabilirsiniz.

Sık Yapılan Hatalar

Flux öğrenirken herkesin takıldığı noktalar var:

  • range() unutmak: Her sorgu mutlaka range() içermeli, yoksa “query must specify a start time” hatası alırsınız
  • _field ve _value karışıklığı: r._field field adını, r._value o field’ın değerini tutar. Filtreleri buna göre yazın
  • Büyük/küçük harf duyarlılığı: r._measurement == "CPU" ile r._measurement == "cpu" farklı sonuçlar verir, verilerinizin nasıl yazıldığını kontrol edin
  • aggregateWindow sonrası filter: Aggregation sonrası _value değerine göre filtreleme yapılabilir ama bunu yapabilmek için aggregateWindowdan sonra filter uygulamanız gerekir
  • group() davranışı: Bazı fonksiyonlar çalışmadan önce doğru gruplama yapıldığından emin olun

Sonuç

Flux, ilk bakışta InfluxQL’e kıyasla fazla karmaşık görünebilir. Ancak birkaç gün düzenli kullandıktan sonra pipeline mantığının ne kadar güçlü olduğunu fark ediyorsunuz. Özellikle join(), pivot() ve custom aggregation fonksiyonları sayesinde InfluxQL ile yapamayacağınız analizleri kolaylıkla gerçekleştirebiliyorsunuz.

Başlangıç için şu yolu öneriyorum: Önce basit from, range, filter, mean kombinasyonlarıyla alıştırma yapın. Sonra aggregateWindow ile zaman bazlı örneklemeye geçin. Ardından map() ve pivot() ile veri dönüşümlerine bakın. En son olarak task() ile zamanlanmış görevler oluşturun.

Telegraf ile topladığınız metrikleri Flux ile sorgulayıp Grafana’da görselleştirdiğinizde elimde olmadan bir tatmin hissi oluşuyor. Zaman serisi verilerinin bu kadar güçlü bir şekilde işlenebilmesi, özellikle infrastructure monitoring konusunda sysadmin hayatını gerçekten kolaylaştırıyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir