Elasticsearch Transform API ile Özet Tablo ve Veri Agregasyonu Oluşturma

Büyük log altyapılarında zamanla şöyle bir problemle karşılaşırsınız: Ham veriler Elasticsearch’te birikir, Kibana’da dashboard oluştururken her sorgu binlerce hatta milyonlarca doküman üzerinde çalışır ve performans giderek kötüleşir. İşte tam bu noktada Transform API devreye giriyor. Transform API, ham verilerinizi önceden agregasyona tabi tutarak özet tablolar oluşturmanızı sağlar. Böylece dashboard sorgularınız artık ham veri yerine hazır özet tablolara bakar, sorgu süreleriniz dramatik şekilde düşer.

Transform API Nedir ve Ne Zaman Kullanılır

Elasticsearch Transform API, temelde iki mod üzerinde çalışır: pivot ve latest. Pivot modu, SQL’deki GROUP BY mantığına çok benzer şekilde verileri gruplandırıp agregasyon uygular. Latest modu ise her bir grup için en son kaydı getirir.

Şu senaryolarda Transform API kullanmayı ciddi şekilde düşünmelisiniz:

  • Kibana dashboard’larınız yüksek kardinaliteli alanlarda terms aggregation yapıyorsa
  • Saatlik, günlük veya haftalık özet raporlara ihtiyaç duyuyorsanız
  • Anomali tespiti için baseline metrikler oluşturmanız gerekiyorsa
  • Farklı indekslerden gelen verileri tek bir özet yapıda birleştirmek istiyorsanız
  • Gerçek zamanlı olmak zorunda olmayan ama sık sorgulanan verilere hizmet veriyorsanız

Şunu netleştirelim: Transform API bir stream processing aracı değildir. Sürekli ya da scheduled aralıklarla çalışır ve bir hedef indeks üretir. Bu hedef indeks, sonraki sorgularınızın ham veri yerine bakacağı yer olur.

Temel Kavramlar

Transform API’yi anlamak için birkaç kavramı netleştirmek gerekiyor.

Source index: Ham verilerinizin bulunduğu indeks ya da indeks pattern’i. Wildcard kullanabilirsiniz.

Destination index: Transform işleminin sonucunun yazılacağı yeni indeks.

Sync field: Continuous transform’larda hangi alanın senkronizasyon için kullanılacağını belirtir. Genellikle @timestamp kullanılır.

Pivot: Gruplama ve agregasyon tanımının yapıldığı bölüm.

Frequency: Continuous transform’ın ne sıklıkla çalışacağını belirler. 1m, 5m, 1h gibi değerler alır.

İlk Transform’ı Oluşturmak

Pratik bir senaryodan gidelim. Nginx access loglarınızı Filebeat ile topluyorsunuz ve her istek nginx-access-* indeksine yazılıyor. Amacınız: Her HTTP status kodu için saatlik istek sayısını, ortalama response time’ı ve toplam byte transfer’ini özetlemek.

Önce kaynak verinizin yapısını kontrol edelim:

curl -X GET "localhost:9200/nginx-access-2024.01.01/_mapping?pretty" 
  -H "Content-Type: application/json"

Şimdi transform tanımını oluşturalım:

curl -X PUT "localhost:9200/_transform/nginx_hourly_summary" 
  -H "Content-Type: application/json" 
  -d '{
    "source": {
      "index": ["nginx-access-*"],
      "query": {
        "range": {
          "@timestamp": {
            "gte": "now-90d"
          }
        }
      }
    },
    "pivot": {
      "group_by": {
        "hour_bucket": {
          "date_histogram": {
            "field": "@timestamp",
            "calendar_interval": "1h"
          }
        },
        "status_code": {
          "terms": {
            "field": "http.response.status_code"
          }
        },
        "upstream_host": {
          "terms": {
            "field": "destination.domain.keyword"
          }
        }
      },
      "aggregations": {
        "request_count": {
          "value_count": {
            "field": "_id"
          }
        },
        "avg_response_time": {
          "avg": {
            "field": "http.response.duration"
          }
        },
        "p95_response_time": {
          "percentiles": {
            "field": "http.response.duration",
            "percents": [95]
          }
        },
        "total_bytes_sent": {
          "sum": {
            "field": "http.response.bytes"
          }
        },
        "max_response_time": {
          "max": {
            "field": "http.response.duration"
          }
        }
      }
    },
    "dest": {
      "index": "nginx-hourly-summary"
    },
    "description": "Nginx saatlik ozet - status code ve host bazinda",
    "frequency": "5m",
    "sync": {
      "time": {
        "field": "@timestamp",
        "delay": "60s"
      }
    }
  }'

delay parametresi önemli. Elasticsearch’e “son 60 saniyeyi henüz işleme, veriler hala geliyor olabilir” diyorsunuz. Yoğun log akışlarında bu değeri artırmak tutarlılığı artırır.

Transform’ı Başlatmak ve Yönetmek

Transform oluşturuldu ama henüz çalışmıyor. Başlatmak için:

# Transform'i baslat
curl -X POST "localhost:9200/_transform/nginx_hourly_summary/_start"

# Durumunu kontrol et
curl -X GET "localhost:9200/_transform/nginx_hourly_summary/_stats?pretty"

Stats çıktısında dikkat etmeniz gereken alanlar:

  • state: started, stopped, failed, aborting değerlerini alır
  • documents_processed: Kaç doküman işlendiğini gösterir
  • trigger_count: Kaç kez tetiklendiğini gösterir
  • index_time_in_ms: Hedef indekse yazma süresi
  • search_time_in_ms: Kaynak indeksten okuma süresi

Eğer bir transform’ı durdurmak isterseniz:

curl -X POST "localhost:9200/_transform/nginx_hourly_summary/_stop?wait_for_completion=true"

Continuous vs Batch Transform

Transform API iki farklı modda çalışabilir. Şimdiye kadar örneklediğimiz continuous transform, sync alanı tanımlandığında aktif olur ve sürekli kaynak indeksi izleyerek hedef indeksi günceller.

Batch transform ise sync alanı olmadığında devreye girer. Tek seferlik çalışır, işi bitince stopped durumuna geçer. Bu mod özellikle geçmiş verileri backfill etmek için idealdir.

curl -X PUT "localhost:9200/_transform/nginx_monthly_backfill" 
  -H "Content-Type: application/json" 
  -d '{
    "source": {
      "index": ["nginx-access-2023.*"],
      "query": {
        "bool": {
          "filter": [
            {
              "range": {
                "@timestamp": {
                  "gte": "2023-01-01",
                  "lte": "2023-12-31"
                }
              }
            }
          ]
        }
      }
    },
    "pivot": {
      "group_by": {
        "day_bucket": {
          "date_histogram": {
            "field": "@timestamp",
            "calendar_interval": "1d"
          }
        },
        "http_method": {
          "terms": {
            "field": "http.request.method.keyword"
          }
        }
      },
      "aggregations": {
        "total_requests": {
          "value_count": {
            "field": "_id"
          }
        },
        "error_rate": {
          "filter": {
            "range": {
              "http.response.status_code": {
                "gte": 400
              }
            }
          }
        }
      }
    },
    "dest": {
      "index": "nginx-2023-daily-backfill"
    },
    "description": "2023 yili gunluk backfill - tek seferlik"
  }'

Gerçek Dünya Senaryosu: Uygulama Performans İzleme

Bir e-ticaret platformu yönetiyorsunuz. APM verileri apm-* indekslerine yazılıyor ve her checkout işlemi loglanıyor. Pazarlama ekibi sabah toplantılarında dün hangi saatler arasında checkout conversion’larının düştüğünü sormaya başladı. Her seferinde Kibana’da sorgu çalıştırmak yerine hazır bir özet tablo olsun istiyorsunuz.

curl -X PUT "localhost:9200/_transform/apm_checkout_performance" 
  -H "Content-Type: application/json" 
  -d '{
    "source": {
      "index": ["apm-*-transaction*"],
      "query": {
        "bool": {
          "filter": [
            {
              "term": {
                "transaction.type": "request"
              }
            },
            {
              "term": {
                "service.name": "checkout-service"
              }
            }
          ]
        }
      }
    },
    "pivot": {
      "group_by": {
        "time_bucket": {
          "date_histogram": {
            "field": "@timestamp",
            "calendar_interval": "30m"
          }
        },
        "transaction_name": {
          "terms": {
            "field": "transaction.name.keyword"
          }
        },
        "outcome": {
          "terms": {
            "field": "event.outcome.keyword"
          }
        }
      },
      "aggregations": {
        "transaction_count": {
          "value_count": {
            "field": "transaction.id"
          }
        },
        "avg_duration_ms": {
          "avg": {
            "field": "transaction.duration.us",
            "missing": 0
          }
        },
        "p99_duration_ms": {
          "percentiles": {
            "field": "transaction.duration.us",
            "percents": [50, 75, 95, 99]
          }
        },
        "throughput": {
          "rate": {
            "unit": "minute"
          }
        }
      }
    },
    "dest": {
      "index": "apm-checkout-summary",
      "pipeline": "checkout-summary-enrich"
    },
    "frequency": "10m",
    "sync": {
      "time": {
        "field": "@timestamp",
        "delay": "120s"
      }
    },
    "settings": {
      "max_page_search_size": 500,
      "docs_per_second": 1000
    }
  }'

settings bölümündeki parametreler production ortamında kritik önem taşıyor:

  • max_page_search_size: Her arama döngüsünde kaç doküman alınacağını belirler. Büyük veri setlerinde bu değeri düşürmek kaynak kullanımını dengelemenizi sağlar.
  • docs_per_second: Throttle mekanizması. Cluster üzerindeki yükü kontrol altında tutmak için kullanılır. Kritik saatler için bunu null yaparak kaldırabilir, off-peak saatler için düşük tutabilirsiniz.

Transform Preview ile Test Etmek

Transform’ı gerçekten oluşturmadan önce nasıl bir çıktı ürettiğini görmek için preview endpoint’ini kullanın. Bu adımı atlamamanızı şiddetle tavsiye ederim, çünkü yanlış bir agregasyon konfigürasyonu milyonlarca satırı işledikten sonra anlamsız veri üretebilir.

curl -X POST "localhost:9200/_transform/_preview" 
  -H "Content-Type: application/json" 
  -d '{
    "source": {
      "index": ["nginx-access-*"],
      "query": {
        "range": {
          "@timestamp": {
            "gte": "now-1d"
          }
        }
      }
    },
    "pivot": {
      "group_by": {
        "hour_bucket": {
          "date_histogram": {
            "field": "@timestamp",
            "calendar_interval": "1h"
          }
        },
        "status_code": {
          "terms": {
            "field": "http.response.status_code"
          }
        }
      },
      "aggregations": {
        "request_count": {
          "value_count": {
            "field": "_id"
          }
        },
        "avg_duration": {
          "avg": {
            "field": "http.response.duration"
          }
        }
      }
    }
  }'

Preview size hem örnek dokümanları hem de üretilecek mapping’i gösterir. generated_dest_index bölümünde Elasticsearch’ün otomatik olarak türeteceği mapping’i inceleyebilirsiniz.

Hedef İndeks Mapping’ini Önceden Tanımlamak

Elasticsearch’in otomatik mapping tespitine her zaman güvenemezsiniz. Özellikle bazı alanların integer yerine long, ya da double yerine float olarak tanımlanmasını istediğinizde explicit mapping oluşturun:

# Once hedef indeks mapping'ini olustur
curl -X PUT "localhost:9200/nginx-hourly-summary" 
  -H "Content-Type: application/json" 
  -d '{
    "settings": {
      "number_of_shards": 2,
      "number_of_replicas": 1,
      "index.codec": "best_compression"
    },
    "mappings": {
      "properties": {
        "hour_bucket": {
          "type": "date"
        },
        "status_code": {
          "type": "integer"
        },
        "upstream_host": {
          "type": "keyword"
        },
        "request_count": {
          "type": "long"
        },
        "avg_response_time": {
          "type": "float"
        },
        "p95_response_time.95": {
          "type": "float"
        },
        "total_bytes_sent": {
          "type": "long"
        },
        "max_response_time": {
          "type": "float"
        }
      }
    }
  }'

Hedef indeks daha önce oluşturulmuşsa Transform API bunu kullanır. Yoksa kendisi oluşturur. Ancak üretim ortamında her zaman mapping’i kendiniz tanımlamanızı öneririm. Bu sayede Kibana’da alan tiplerinden kaynaklanan sorunlarla uğraşmazsınız.

Transform Yönetimi için Faydalı Komutlar

Günlük operasyonlarda kullanacağınız komutları bir arada görelim:

# Tum transform'lari listele
curl -X GET "localhost:9200/_transform?pretty"

# Belirli bir transform'in detayli stats'ini goruntule
curl -X GET "localhost:9200/_transform/nginx_hourly_summary/_stats?pretty"

# Transform'i guncelle (once durdur)
curl -X POST "localhost:9200/_transform/nginx_hourly_summary/_stop"

curl -X PUT "localhost:9200/_transform/nginx_hourly_summary/_update" 
  -H "Content-Type: application/json" 
  -d '{
    "frequency": "15m",
    "settings": {
      "docs_per_second": 500
    },
    "description": "Guncellenmis nginx saatlik ozet"
  }'

# Guncelleme sonrasi yeniden baslat
curl -X POST "localhost:9200/_transform/nginx_hourly_summary/_start"

# Transform'i tamamen sil
curl -X DELETE "localhost:9200/_transform/nginx_hourly_summary"

# Checkpoint bilgisini sifirla ve bastan calistir
curl -X POST "localhost:9200/_transform/nginx_hourly_summary/_stop?wait_for_completion=true"
curl -X DELETE "localhost:9200/_transform/nginx_hourly_summary"
# Sonra yeniden olustur ve baslat

Performans ve Kaynak Yönetimi

Transform API cluster kaynaklarını önemli ölçüde tüketebilir. Özellikle ilk çalıştırmada büyük veri setlerini işlerken şu noktalara dikkat edin:

Shard allocation’a dikkat edin: Hedef indeks için number_of_shards değerini kaynak indeksle aynı tutmayın. Özet tablolar genellikle çok daha az veri üretir, 1-2 shard yeterlidir.

Transform scheduling’i cluster yüküne göre ayarlayın: frequency değerini iş yüküne göre belirleyin. 1 dakikalık interval, cluster zaten yoğunken ek baskı oluşturur.

docs_per_second throttle’ını kullanın: Özellikle backfill işlemlerinde bu değeri makul tutun. Cluster monitörlerinde search_rate ve indexing_rate artışlarını izleyin.

İndeks lifecycle yönetimini unutmayın: Hedef indeks de zamanla büyür. ILM policy tanımlamayı ihmal etmeyin.

Hedef indeksi sorgularken performansı artırmak için uygun field’lara index ekleyin:

# Sorgu performansini artirmak icin runtime field yerine
# materialized field kullanmayi tercih edin
curl -X PUT "localhost:9200/nginx-hourly-summary/_settings" 
  -H "Content-Type: application/json" 
  -d '{
    "index": {
      "refresh_interval": "30s",
      "number_of_replicas": 1
    }
  }'

Kibana ile Entegrasyon

Transform API ile oluşturduğunuz özet indekslerini Kibana’da kullanmak için bir data view oluşturmanız yeterli. Stack Management > Data Views yolunu takip edin ve nginx-hourly-summary için yeni bir data view tanımlayın.

Artık bu data view üzerinden oluşturacağınız visualization’lar, ham veri yerine önceden hesaplanmış metriklere bakacak. Özellikle avg_response_time gibi alanlar için Kibana’nın tekrar agregasyon yapmasına gerek kalmayacak, doğrudan max veya avg aggregation kullanarak doğru sonucu elde edeceksiniz.

Kibana’da Lens ile özet tabloyu kullanırken şu noktaya dikkat edin: Transform çıktısındaki değerler zaten önceden hesaplanmış. Kibana’da bu değerleri sum ile değil avg veya max ile agregasyon yapın. Aksi halde zaman aralığı daraltıldığında yanlış sonuçlar görebilirsiniz. Örneğin ortalama response time için p95_response_time.95 alanını avg ile kullanın.

Sonuç

Transform API, Elasticsearch’te veri yönetimini kökten değiştiren bir özelliktir. Doğru kullanıldığında hem sorgu performansınızı dramatik şekilde artırır hem de gereksiz cluster baskısından kurtulmanızı sağlar. Özellikle şu senaryolarda mutlaka transform oluşturun: saatlik veya günlük özet raporlar, anomali tespiti için baseline hesaplamaları ve yüksek kardinaliteli alan agregasyonları.

Başlangıç için şu sırlamayı öneririm: Önce _preview endpoint ile konfigürasyonunuzu test edin, sonra hedef indeks mapping’ini manuel tanımlayın, ardından batch transform ile geçmiş verilerinizi backfill edin, en son continuous transform’ı düşük bir docs_per_second değeriyle başlatıp cluster davranışını gözlemleyin. Cluster stabil çalışıyorsa throttle değerini kaldırabilirsiniz.

Unutmayın, transform’lar da bir indirekt maliyet taşır. Gereksiz yere transform oluşturmak cluster’ınızı yavaş yavaş yorar. Her transform için net bir kullanım amacı tanımlayın ve kullanılmayan transform’ları düzenli olarak temizleyin.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir