Elasticsearch Ingest Pipeline ile Veri Dönüşümü ve Zenginleştirme
Elasticsearch’te veriyi indekslemeden önce işlemek, temizlemek ve zenginleştirmek için en güçlü araçlardan biri olan Ingest Pipeline, özellikle production ortamlarında fark yaratan bir özellik. Logstash’e gerek kalmadan doğrudan Elasticsearch içinde veri dönüşümü yapabilmek, hem altyapı karmaşıklığını azaltıyor hem de performans açısından ciddi avantajlar sağlıyor. Bu yazıda gerçek dünya senaryoları üzerinden Ingest Pipeline’ı derinlemesine inceleyeceğiz.
Ingest Pipeline Nedir ve Ne Zaman Kullanılır
Elasticsearch Ingest Pipeline, belgeleri bir index’e yazmadan önce üzerinde çeşitli işlemler yapmanızı sağlayan bir mekanizmadır. Her pipeline bir veya birden fazla processor içerir ve bu processor’lar sırayla çalışır.
Şu durumlarda Ingest Pipeline tercih edilmeli:
- Logstash veya başka bir ETL aracı kurmak istemiyorsanız
- Basit veri dönüşümleri için ekstra bileşen overhead’ı istemiyorsanız
- Elasticsearch cluster’ınız zaten var ve veriyi kaynakta temizlemek istiyorsanız
- Beats (Filebeat, Metricbeat) ile doğrudan Elasticsearch’e veri gönderiyorsanız
Özellikle Filebeat + Ingest Pipeline kombinasyonu, küçük ve orta ölçekli log yönetimi altyapıları için oldukça yaygın ve pratik bir çözüm.
Temel Pipeline Yapısı
Bir pipeline tanımının iskeletini görelim:
curl -X PUT "localhost:9200/_ingest/pipeline/my-first-pipeline"
-H 'Content-Type: application/json'
-d '{
"description": "Ilk test pipeline",
"processors": [
{
"set": {
"field": "environment",
"value": "production"
}
},
{
"lowercase": {
"field": "message"
}
}
]
}'
Bu basit örnekte iki processor var:
- set: Her belgeye sabit bir
environmentalanı ekler - lowercase:
messagealanının içeriğini küçük harfe çevirir
Pipeline’ı test etmek için _simulate endpoint’ini kullanabilirsiniz:
curl -X POST "localhost:9200/_ingest/pipeline/my-first-pipeline/_simulate"
-H 'Content-Type: application/json'
-d '{
"docs": [
{
"_source": {
"message": "Bu Bir TEST Mesajidir",
"timestamp": "2024-01-15T10:30:00Z"
}
}
]
}'
_simulate endpoint’i production’a bir şey yazmadan pipeline’ınızın nasıl davranacağını görmenizi sağlar. Geliştirme sürecinde bu endpoint hayat kurtarır.
Gerçek Dünya Senaryosu 1: Nginx Access Log İşleme
Diyelim ki Filebeat ile Nginx access log’larını topluyorsunuz ve ham log satırlarını parse edip anlamlı alanlara ayırmak istiyorsunuz. Gelen veri şöyle:
192.168.1.100 - john [15/Jan/2024:10:30:00 +0300] "GET /api/users HTTP/1.1" 200 1234
Bu veriyi işleyecek pipeline:
curl -X PUT "localhost:9200/_ingest/pipeline/nginx-access-logs"
-H 'Content-Type: application/json'
-d '{
"description": "Nginx access log parser",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{IPORHOST:client_ip} - %{USER:auth_user} \[%{HTTPDATE:timestamp}\] "%{WORD:http_method} %{URIPATH:request_path} HTTP/%{NUMBER:http_version}" %{NUMBER:status_code:int} %{NUMBER:bytes_sent:int}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": ["dd/MMM/yyyy:HH:mm:ss Z"],
"target_field": "@timestamp"
}
},
{
"geoip": {
"field": "client_ip",
"target_field": "geo",
"ignore_missing": true
}
},
{
"remove": {
"field": ["timestamp", "message"],
"ignore_missing": true
}
}
]
}'
Bu pipeline şunları yapıyor:
- grok processor: Ham log satırını regex benzeri pattern’larla parse ediyor, IP, kullanıcı adı, HTTP metodu, URL, status kodu ve byte boyutunu ayrı alanlara çıkarıyor
- date processor: Nginx formatındaki tarihi Elasticsearch’in anlayacağı
@timestampformatına çeviriyor - geoip processor: IP adresinden coğrafi konum bilgisi ekliyor (Kibana haritaları için kritik)
- remove processor: Artık gereksiz olan ham alanları temizliyor
GeoIP processor’ın çalışması için Elasticsearch’in GeoIP veritabanını indirmesi gerekir. Bunu kontrol etmek için:
curl -X GET "localhost:9200/_ingest/geoip/stats"
Gerçek Dünya Senaryosu 2: E-Ticaret Sipariş Verisi Zenginleştirme
Bir e-ticaret platformunda sipariş verilerini indekslerken bazı türetilmiş alanlar eklemeniz gerekiyor. Örneğin toplam tutarı hesaplamak, sipariş büyüklüğünü kategorize etmek ve işlem zamanını normalize etmek gibi.
curl -X PUT "localhost:9200/_ingest/pipeline/ecommerce-orders"
-H 'Content-Type: application/json'
-d '{
"description": "E-ticaret siparis zenginlestirme pipeline",
"processors": [
{
"script": {
"description": "Toplam tutari hesapla",
"lang": "painless",
"source": "ctx.total_amount = ctx.unit_price * ctx.quantity"
}
},
{
"script": {
"description": "Siparis buyukluğunu kategorize et",
"lang": "painless",
"source": """
if (ctx.total_amount < 100) {
ctx.order_category = "small";
} else if (ctx.total_amount < 500) {
ctx.order_category = "medium";
} else {
ctx.order_category = "large";
}
"""
}
},
{
"set": {
"field": "processed_at",
"value": "{{_ingest.timestamp}}"
}
},
{
"convert": {
"field": "quantity",
"type": "integer"
}
}
]
}'
script processor en güçlü ama aynı zamanda en dikkatli kullanılması gereken processor’dır. Painless script ile neredeyse her türlü hesaplama yapabilirsiniz, ancak karmaşık scriptler cluster performansını etkileyebilir. Production’da script processor kullanıyorsanız, muhakkak yük testleri yapın.
Pipeline Zincirleme: Pipeline’lar Arası Geçiş
Büyük ve karmaşık dönüşümler için tek bir büyük pipeline yerine modüler pipeline’lar oluşturmak hem yönetim hem de hata ayıklama açısından çok daha pratik.
# Ana koordinator pipeline
curl -X PUT "localhost:9200/_ingest/pipeline/main-coordinator"
-H 'Content-Type: application/json'
-d '{
"description": "Ana koordinator - log tipine gore yonlendir",
"processors": [
{
"set": {
"field": "ingest_time",
"value": "{{_ingest.timestamp}}"
}
},
{
"pipeline": {
"if": "ctx.log_type == '''nginx'''",
"name": "nginx-access-logs"
}
},
{
"pipeline": {
"if": "ctx.log_type == '''application'''",
"name": "application-logs"
}
}
]
}'
Bu yaklaşımın avantajları:
- Her log tipi için ayrı pipeline bakımı yapabilirsiniz
- Bir pipeline’da hata olduğunda diğerleri etkilenmez
- Yeni log tipi eklemek için sadece yeni pipeline oluşturup koordinatöre bir satır eklemeniz yeterli
Conditional (Koşullu) Processor Kullanımı
Her processor’a if koşulu ekleyerek seçici dönüşümler yapabilirsiniz:
curl -X PUT "localhost:9200/_ingest/pipeline/smart-log-processor"
-H 'Content-Type: application/json'
-d '{
"description": "Akilli log isleme",
"processors": [
{
"uppercase": {
"if": "ctx.severity == '''critical'''",
"field": "message"
}
},
{
"set": {
"if": "ctx.status_code != null && ctx.status_code >= 500",
"field": "alert_required",
"value": true
}
},
{
"remove": {
"if": "ctx.debug_info != null && ctx.environment == '''production'''",
"field": "debug_info"
}
}
]
}'
Bu örnekte:
- Sadece
criticalseverity’li loglar büyük harfe çevriliyor - 500 ve üzeri HTTP hataları için
alert_requiredalanı ekleniyor - Production ortamında debug bilgileri temizleniyor
Hata Yönetimi: on_failure ile Güvenli Pipeline
Production’da en sık karşılaşılan sorunlardan biri, beklenmedik formattaki verinin pipeline’ı patlatması. on_failure mekanizmasıyla bu durumu zarif bir şekilde yönetebilirsiniz:
curl -X PUT "localhost:9200/_ingest/pipeline/resilient-pipeline"
-H 'Content-Type: application/json'
-d '{
"description": "Hata toleransli pipeline",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{COMBINEDAPACHELOG}"],
"on_failure": [
{
"set": {
"field": "parse_error",
"value": "Grok parse basarisiz: {{_ingest.on_failure_message}}"
}
},
{
"set": {
"field": "parse_status",
"value": "failed"
}
}
]
}
}
],
"on_failure": [
{
"set": {
"field": "pipeline_error",
"value": "Pipeline hatasi: {{_ingest.on_failure_message}}"
}
},
{
"set": {
"field": "_index",
"value": "failed-documents"
}
}
]
}'
İki seviye hata yönetimi var burada:
- Processor seviyesinde on_failure: Grok parse başarısız olursa hata mesajını kaydet, ama belgeyi indekslemeye devam et
- Pipeline seviyesinde on_failure: Hiçbir şey işe yaramazsa belgeyi
failed-documentsindex’ine yönlendir
failed-documents index’ini düzenli takip edin. Oraya düşen belgeler genellikle veri kalitesi problemlerinin ilk işaretidir.
Enrich Processor ile Lookup Tabloları
Bir IP adresine karşılık gelen müşteri bilgisini veya bir ürün ID’sine karşılık gelen kategori bilgisini başka bir index’ten çekerek belgenizi zenginleştirebilirsiniz. Bunun için önce bir enrich policy oluşturmanız gerekir:
# Once kaynak index'e veri ekleyin
curl -X POST "localhost:9200/customer-registry/_doc"
-H 'Content-Type: application/json'
-d '{
"customer_ip": "192.168.1.100",
"customer_name": "Ahmet Yilmaz",
"customer_segment": "premium",
"account_manager": "Mehmet Kaya"
}'
# Enrich policy olustur
curl -X PUT "localhost:9200/_enrich/policy/customer-lookup"
-H 'Content-Type: application/json'
-d '{
"match": {
"indices": "customer-registry",
"match_field": "customer_ip",
"enrich_fields": ["customer_name", "customer_segment", "account_manager"]
}
}'
# Policy'i execute et (enrich index olusturur)
curl -X POST "localhost:9200/_enrich/policy/customer-lookup/_execute"
Şimdi bu policy’yi pipeline’da kullanabilirsiniz:
curl -X PUT "localhost:9200/_ingest/pipeline/web-log-with-customer"
-H 'Content-Type: application/json'
-d '{
"description": "Web logu musteri bilgisiyle zenginlestir",
"processors": [
{
"enrich": {
"policy_name": "customer-lookup",
"field": "client_ip",
"target_field": "customer",
"ignore_missing": true
}
}
]
}'
Artık her log kaydına otomatik olarak müşteri adı, segment ve hesap yöneticisi bilgisi ekleniyor. Bu özellik özellikle B2B SaaS ürünlerinde müşteri bazlı log analizi yaparken inanılmaz faydalı.
Pipeline’ı Index’e Bağlama
Pipeline’ı her istek için manuel belirtmek yerine index template veya index ayarlarıyla varsayılan pipeline tanımlayabilirsiniz:
# Index template ile varsayilan pipeline tanimla
curl -X PUT "localhost:9200/_index_template/nginx-logs-template"
-H 'Content-Type: application/json'
-d '{
"index_patterns": ["nginx-logs-*"],
"template": {
"settings": {
"default_pipeline": "nginx-access-logs",
"final_pipeline": "audit-trail-pipeline"
}
}
}'
- default_pipeline: Kullanıcı farklı bir pipeline belirtmediği sürece bu pipeline çalışır
- final_pipeline: Her zaman çalışır, kullanıcı farklı bir pipeline belirlese bile. Audit log, timestamp normalizasyonu gibi zorunlu işlemler için idealdir
Pipeline Performans İzleme
Pipeline’larınızın nasıl performans gösterdiğini izlemek için:
curl -X GET "localhost:9200/_nodes/stats/ingest?pretty"
Bu endpoint size şunları verir:
- Her pipeline için toplam işlenen belge sayısı
- Başarısız belge sayısı
- Toplam işlem süresi (milisaniye)
- Her processor’ın ayrı ayrı istatistikleri
Yüksek işlem süresi gördüğünüzde suçlu processor’ı tespit etmek için:
curl -X GET "localhost:9200/_nodes/stats/ingest?filter_path=nodes.*.ingest.pipelines"
-H 'Content-Type: application/json'
Eğer bir processor sürekli yüksek süre gösteriyorsa, genellikle şu nedenlerden biridir:
- Karmaşık Painless script
- GeoIP veritabanı güncel değil
- Enrich index çok büyük veya optimize edilmemiş
Filebeat ile Pipeline Entegrasyonu
Filebeat kullanıyorsanız pipeline’ı doğrudan Filebeat konfigürasyonunda belirtebilirsiniz:
# filebeat.yml
output.elasticsearch:
hosts: ["localhost:9200"]
pipeline: "nginx-access-logs"
index: "nginx-logs-%{+yyyy.MM.dd}"
Ya da Filebeat modüllerini kullanıyorsanız, her modülün zaten built-in pipeline’ları vardır. Örneğin nginx modülü aktif edildiğinde otomatik olarak uygun pipeline’lar oluşturulur:
filebeat modules enable nginx
filebeat setup --pipelines
Bu komut Filebeat’in Nginx modülü için önceden hazırlanmış pipeline’ları Elasticsearch’e yükler. Kendi pipeline’ınızı sıfırdan yazmak zorunda kalmadan standart log formatları için hazır çözümler sunar.
Pratik İpuçları ve Dikkat Edilmesi Gerekenler
Pipeline geliştirirken aşağıdaki noktalara dikkat edin:
- Her zaman _simulate ile test edin: Production’a almadan önce mutlaka gerçek veri örnekleriyle test edin
- ignore_missing kullanın: Alanın her belgede bulunacağını garanti edemiyorsanız, processor’a
"ignore_missing": trueekleyin - Pipeline versiyonlama: Pipeline isimlerine versiyon ekleyin (
nginx-logs-v2) ve eski versiyonu hemen silmeyin - Grok pattern kütüphanesi: Grok için
grok.elastic.coadresinde yüzlerce hazır pattern bulunuyor - Script processor’da ctx değişikliği: Painless script’te
ctxüzerinde değişiklik yapıyorsanız,ctx.containsKey("field")ile önce alanın varlığını kontrol edin
Sonuç
Elasticsearch Ingest Pipeline, veri işleme pipeline’ınızı sadeleştirmek için son derece güçlü bir araç. Basit alan dönüşümlerinden karmaşık lookup zenginleştirmelerine kadar geniş bir yelpazede kullanılabilir. Özellikle Logstash gibi ayrı bir bileşen yönetmek istemediğiniz veya Beats doğrudan Elasticsearch’e veri gönderdiği senaryolarda bu özellik gerçekten değer katıyor.
Dikkat edilmesi gereken en önemli nokta performans: Her processor CPU ve bellek tüketir. Grok ve özellikle Painless script processor’larını aşırı kullanmak, yüksek veri hacimlerinde cluster’ınızı zorlayabilir. Production’a almadan önce temsili veri yükleriyle mutlaka performans testleri yapın.
Pipeline’larınızı modüler tutun, hata yönetimini ihmal etmeyin ve _nodes/stats/ingest endpoint’ini monitoring dashboard’larınıza ekleyin. Bu üç alışkanlık, Ingest Pipeline tabanlı bir altyapıyı uzun vadede sorunsuz yönetmenizi sağlar.
