Flux ile InfluxDB Sorgulama Rehberi
Eğer daha önce InfluxDB 1.x ile SQL benzeri InfluxQL kullandıysanız ve şimdi InfluxDB 2.x’e geçtiyseniz, karşınıza çıkan Flux dili ilk başta biraz yabancı gelebilir. Ancak birkaç saat harcadıktan sonra Flux’un ne kadar güçlü ve esnek bir sorgu dili olduğunu anlıyorsunuz. Bu yazıda Flux ile çalışmanın temellerinden başlayarak, gerçek dünya senaryolarına kadar kapsamlı bir rehber sunacağım.
Flux Nedir ve Neden Önemlidir?
Flux, InfluxData tarafından geliştirilen, veri sorgulama ve işleme için tasarlanmış fonksiyonel bir skript dilidir. InfluxQL’den temel farkı şudur: Flux sadece bir sorgu dili değil, aynı zamanda veri transformasyonu, matematiksel hesaplamalar ve hatta uyarı (alerting) mantığı yazabileceğiniz bir dildir.
Flux’un öne çıkan özellikleri şunlardır:
- Pipeline tabanlı yapı: Veriler
|>operatörü ile adım adım işlenir - Fonksiyonel yaklaşım: Her işlem bir fonksiyon çağrısıdır
- Zengin built-in fonksiyonlar: 200’den fazla hazır fonksiyon
- Çoklu veri kaynağı desteği: InfluxDB dışında CSV, SQL, HTTP kaynaklarından da veri çekebilirsiniz
- Veri birleştirme (join): Farklı measurement’lardan gelen verileri birleştirebilirsiniz
Temel Kavramlar
Bucket ve Measurement
InfluxDB 2.x’te veriler bucket’larda saklanır. Her bucket içinde measurement’lar, her measurement içinde field ve tag’lar bulunur. Flux sorgularında bu hiyerarşiyi anlamak kritiktir.
- Bucket: Veri depolama birimi (InfluxDB 1.x’teki database’e benzer)
- Measurement: Tablo benzeri yapı (InfluxDB 1.x’tekiyle aynı)
- Field: Asıl metrik değerleri (sayısal, string, boolean olabilir)
- Tag: İndekslenmiş metadata (her zaman string)
Pipeline Operatörü
Flux’un en karakteristik özelliği olan |> operatörü, bir fonksiyonun çıktısını bir sonraki fonksiyona aktarır. Unix pipe’larına çok benzer bir mantıkla çalışır.
from(bucket: "monitoring")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_idle")
|> mean()
Yukarıdaki sorguyu okuyalım: “monitoring bucket’ından son 1 saatlik veriyi al, cpu measurement’ını filtrele, usage_idle field’ını al ve ortalamasını hesapla.” Bu kadar okunabilir bir sözdizimiyle çalışmak gerçekten keyiflidir.
İlk Sorgularınız
Temel Veri Çekme
Her Flux sorgusu from() ile başlar ve mutlaka range() fonksiyonu kullanılmalıdır. InfluxDB zaman serisi veritabanı olduğu için zaman aralığı belirtmek zorunludur.
// Son 24 saatin tüm CPU verilerini getir
from(bucket: "servers")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu")
Burada start: -24h ifadesi “şimdiden 24 saat öncesi” anlamına gelir. Başlangıç ve bitiş zamanını ayrı ayrı da belirtebilirsiniz:
// Belirli bir tarih aralığı
from(bucket: "servers")
|> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-07T23:59:59Z)
|> filter(fn: (r) => r._measurement == "cpu")
Filter Fonksiyonu ile Veri Seçimi
filter() fonksiyonu, satır bazında filtreleme yapar. fn parametresi bir lambda fonksiyon alır ve her satır için true veya false döner.
// Belirli bir sunucunun belirli bir field'ını filtrele
from(bucket: "servers")
|> range(start: -6h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_user" and
r.host == "web-server-01"
)
Birden fazla tag değerine göre filtreleme için or kullanabilirsiniz:
// Birden fazla sunucu için filtreleme
from(bucket: "servers")
|> range(start: -6h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_user" and
(r.host == "web-server-01" or r.host == "web-server-02" or r.host == "db-server-01")
)
Aggregation ve Gruplama
Temel Aggregation Fonksiyonları
Flux’ta sık kullanılan aggregation fonksiyonları şunlardır:
- mean(): Aritmetik ortalama
- sum(): Toplam
- count(): Satır sayısı
- min(): En küçük değer
- max(): En büyük değer
- median(): Medyan değer
- stddev(): Standart sapma
- percentile(): Yüzdelik dilim hesaplama
// Son 1 saatlik CPU kullanımının ortalaması
from(bucket: "servers")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> mean()
aggregateWindow ile Zaman Bazlı Örnekleme
Gerçek dünya senaryolarında en çok ihtiyaç duyduğunuz fonksiyonlardan biri aggregateWindow()dur. Grafana’da zaman serisi grafikleri oluştururken bu fonksiyon olmadan düzgün bir görüntü elde edemezsiniz.
// Her 5 dakikalık CPU ortalaması
from(bucket: "servers")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
Burada createEmpty: false parametresi önemlidir. Eğer bir zaman penceresinde veri yoksa boş satır oluşturulmamasını sağlar. Production ortamında genellikle bunu false olarak bırakmak daha temiz sonuçlar verir.
group() ile Gruplama
// Her sunucu için ayrı ayrı CPU ortalaması
from(bucket: "servers")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> group(columns: ["host"])
|> mean()
Gerçek Dünya Senaryoları
Senaryo 1: Disk Doluluk Uyarısı
Bir sistemin disk kullanımını izliyorsunuz ve %85’in üzerindeki durumlarda uyarı almak istiyorsunuz. Telegraf ile topladığınız disk metriklerini sorgulayalım:
// Disk kullanımı %85'ten fazla olan sunucular
from(bucket: "infrastructure")
|> range(start: -5m)
|> filter(fn: (r) =>
r._measurement == "disk" and
r._field == "used_percent"
)
|> filter(fn: (r) => r._value > 85.0)
|> last()
|> map(fn: (r) => ({r with
alert_message: "UYARI: " + r.host + " sunucusunda " + r.path + " dizini doluluk orani: " + string(v: r._value) + "%"
}))
Bu sorguda map() fonksiyonunu kullandım. map(), her satır için yeni bir dönüşüm uygulamanızı sağlar ve burada özel bir uyarı mesajı oluşturuyoruz.
Senaryo 2: HTTP Response Süresi Analizi
Web uygulamanızın response sürelerini izliyorsunuz. 95. yüzdelik dilimin son 30 dakika içinde nasıl değiştiğini görmek istiyorsunuz:
// HTTP response süresi P95 analizi - her 1 dakikalık pencere
from(bucket: "application")
|> range(start: -30m)
|> filter(fn: (r) =>
r._measurement == "http_response" and
r._field == "response_time" and
r.server == "api.example.com"
)
|> aggregateWindow(
every: 1m,
fn: (tables=<-, column) => tables |> quantile(q: 0.95, column: column),
createEmpty: false
)
Bu biraz daha gelişmiş bir kullanım. aggregateWindow içinde custom bir fonksiyon tanımlayarak 95. yüzdelik dilimi hesaplıyoruz.
Senaryo 3: Bellek Kullanımı Trend Analizi
Bir sunucunun bellek kullanımının son 1 haftadaki trendini analiz etmek için timedMovingAverage() fonksiyonunu kullanabiliriz:
// 1 haftalık bellek kullanımı hareketli ortalama (6 saatlik pencere)
from(bucket: "servers")
|> range(start: -7d)
|> filter(fn: (r) =>
r._measurement == "mem" and
r._field == "used_percent" and
r.host == "db-master-01"
)
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> timedMovingAverage(every: 1h, period: 6h)
Bu sorgu her saat için ortalama bellek kullanımını hesaplar, ardından 6 saatlik kayan ortalama uygular. Kısa vadeli dalgalanmaları filtreler ve genel trendi daha net gösterir.
Veri Dönüşümleri
pivot() Fonksiyonu
Bazen aynı measurement içindeki farklı field’ları yan yana görmek istersiniz. pivot() fonksiyonu bu işi yapar:
// CPU field'larını geniş tablo formatına çevir
from(bucket: "servers")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r.host == "web-01")
|> filter(fn: (r) =>
r._field == "usage_user" or
r._field == "usage_system" or
r._field == "usage_idle"
)
|> pivot(
rowKey: ["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
Bu sorgudan sonra her satırda usage_user, usage_system ve usage_idle kolonları yan yana gelecek. Grafana tablolarında bu format çok işe yarar.
join() ile İki Farklı Measurement’ı Birleştirme
// CPU ve Bellek kullanımını birleştir
cpu_data = from(bucket: "servers")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> rename(columns: {_value: "cpu_usage"})
mem_data = from(bucket: "servers")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> rename(columns: {_value: "mem_usage"})
join(
tables: {cpu: cpu_data, mem: mem_data},
on: ["_time", "host"]
)
Bu tür sorgular özellikle korelasyon analizi için çok değerlidir. CPU spike’ları ile bellek kullanımı arasında bir ilişki var mı diye araştırırken bu pattern’ı sıkça kullanıyorum.
Değişkenler ve Kod Organizasyonu
Flux’ta değişken tanımlamak sorguları hem okunabilir hem de yeniden kullanılabilir hale getirir:
// Değişkenlerle temiz sorgu yazımı
bucket_name = "infrastructure"
time_range = -6h
target_host = "prod-db-01"
threshold = 80.0
yuksek_cpu = from(bucket: bucket_name)
|> range(start: time_range)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_user" and
r.host == target_host
)
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
|> filter(fn: (r) => r._value > threshold)
yuksek_cpu
|> count()
Bu yaklaşım özellikle InfluxDB’nin task (scheduled query) özelliğini kullanırken çok işe yarar. Değişkenleri üstte tanımlayıp tek noktadan yönetebilirsiniz.
Performans İpuçları
Flux sorguları yazarken production ortamında performansı ciddi şekilde etkileyen bazı noktalar var. Bunları öğrenmesi biraz zaman alıyor ama sysadmin deneyimimden derlediğim ipuçları şunlar:
1. Filter’ları Erken Uygulayın
filter() fonksiyonlarını pipeline’ın başına taşımak, işlenecek veri miktarını erken aşamada azaltır. Özellikle _measurement ve _field filtrelerini her zaman önce uygulayın.
Kötü pratik:
from(bucket: "servers")
|> range(start: -7d)
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
İyi pratik:
from(bucket: "servers")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
2. Range Sürelerini Makul Tutun
Gereksiz yere uzun zaman aralıkları sorgu süresini katlar. Grafana dashboardlarında v.timeRangeStart ve v.timeRangeStop template değişkenlerini kullanın:
from(bucket: "servers")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "cpu")
3. limit() ile Test Sorguları
Büyük veri setlerinde yeni bir sorgu yazarken önce limit() ile veri miktarını sınırlandırın:
from(bucket: "servers")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu")
|> limit(n: 100)
task() ile Zamanlanmış Sorgular
InfluxDB’nin en güçlü özelliklerinden biri, Flux sorgularını zamanlanmış görev olarak çalıştırabilmektir. Örneğin her saat başı CPU verilerini downsampling yaparak uzun dönem saklayalım:
option task = {
name: "CPU Downsampling - Hourly",
every: 1h,
offset: 5m
}
from(bucket: "servers")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> to(bucket: "servers_downsampled", org: "myorg")
Bu task her saat başından 5 dakika sonra çalışır (offset sayesinde), son 1 saatlik CPU verilerinin ortalamasını hesaplar ve servers_downsampled bucket’ına yazar. Bu pattern ile ham verileri 30 gün, downsampled verileri 1 yıl saklayan bir retention stratejisi kurabilirsiniz.
Hata Ayıklama
Flux sorgularını debug ederken yield() fonksiyonu çok işe yarar. Pipeline’ın herhangi bir noktasındaki veriyi incelemenizi sağlar:
from(bucket: "servers")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> yield(name: "ham_veri") // Bu noktadaki veriyi göster
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> yield(name: "islenmis_veri") // Bu noktadaki veriyi göster
InfluxDB UI’daki Data Explorer’da her iki yield çıktısını da ayrı ayrı görebilirsiniz. Hangi aşamada ne kadar veri olduğunu ve dönüşümlerin doğru çalışıp çalışmadığını bu şekilde kontrol ederim.
InfluxDB CLI ile Flux Kullanımı
Komut satırından Flux sorguları çalıştırmak için influx query komutunu kullanabilirsiniz:
# Tek satır sorgu
influx query 'from(bucket:"servers") |> range(start:-1h) |> filter(fn:(r) => r._measurement == "cpu") |> mean()'
# Dosyadan sorgu çalıştırma
influx query --file /etc/influxdb/queries/cpu_check.flux
# Token ile uzak sunucuya bağlanarak sorgu
influx query
--host http://influxdb.example.com:8086
--token "your-api-token-here"
--org "myorg"
'from(bucket:"servers") |> range(start:-5m) |> filter(fn:(r) => r._measurement == "cpu") |> last()'
Script’lerinizde bu komutları kullanarak otomatik raporlar veya health check’ler oluşturabilirsiniz.
Sık Yapılan Hatalar
Flux öğrenirken herkesin takıldığı noktalar var:
- range() unutmak: Her sorgu mutlaka
range()içermeli, yoksa “query must specify a start time” hatası alırsınız - _field ve _value karışıklığı:
r._fieldfield adını,r._valueo field’ın değerini tutar. Filtreleri buna göre yazın - Büyük/küçük harf duyarlılığı:
r._measurement == "CPU"iler._measurement == "cpu"farklı sonuçlar verir, verilerinizin nasıl yazıldığını kontrol edin - aggregateWindow sonrası filter: Aggregation sonrası
_valuedeğerine göre filtreleme yapılabilir ama bunu yapabilmek içinaggregateWindowdan sonra filter uygulamanız gerekir - group() davranışı: Bazı fonksiyonlar çalışmadan önce doğru gruplama yapıldığından emin olun
Sonuç
Flux, ilk bakışta InfluxQL’e kıyasla fazla karmaşık görünebilir. Ancak birkaç gün düzenli kullandıktan sonra pipeline mantığının ne kadar güçlü olduğunu fark ediyorsunuz. Özellikle join(), pivot() ve custom aggregation fonksiyonları sayesinde InfluxQL ile yapamayacağınız analizleri kolaylıkla gerçekleştirebiliyorsunuz.
Başlangıç için şu yolu öneriyorum: Önce basit from, range, filter, mean kombinasyonlarıyla alıştırma yapın. Sonra aggregateWindow ile zaman bazlı örneklemeye geçin. Ardından map() ve pivot() ile veri dönüşümlerine bakın. En son olarak task() ile zamanlanmış görevler oluşturun.
Telegraf ile topladığınız metrikleri Flux ile sorgulayıp Grafana’da görselleştirdiğinizde elimde olmadan bir tatmin hissi oluşuyor. Zaman serisi verilerinin bu kadar güçlü bir şekilde işlenebilmesi, özellikle infrastructure monitoring konusunda sysadmin hayatını gerçekten kolaylaştırıyor.
