Logstash Grok Filter ile Log Parsing: Yapılandırılmamış Logları Anlamlı Veriye Dönüştürün
Logları ham halde bırakmak, karanlıkta fener olmadan yürümek gibidir. Elasticsearch’e akan her satır metin, doğru parse edilmediği sürece sadece yer kaplayan gürültüden ibarettir. Grok filter tam da bu noktada devreye giriyor ve yapılandırılmamış log verilerini anlamlı alanlara dönüştürüyor. Yıllarca farklı şirketlerde ELK stack kurup yönetirken gördüm ki çoğu ekip Logstash’i kuruyor, logları aktarıyor ama Grok’u yüzeysel kullanıyor. Bu yazıda sıfırdan gerçek dünya senaryolarına kadar Grok filter’ı derinlemesine ele alacağız.
Grok Nedir ve Neden Önemlidir?
Grok, Logstash içinde yerleşik gelen bir filter plugin’idir. Temelde düzenli ifadeler (regex) üzerine kurulu olmakla birlikte, Grok’u regex’ten ayıran şey önceden tanımlanmış pattern kütüphanesidir. %{IP:client_ip} yazdığınızda arka planda bir IP regex’i çalışır, ama siz bunu anlamlı bir isimle etiketlemiş olursunuz.
Bir web sunucusundan gelen şu log satırını düşünün:
192.168.1.45 - john [10/Nov/2023:14:32:11 +0300] "GET /api/users HTTP/1.1" 200 4823
Bu satırı Elasticsearch’e ham string olarak atmak yerine client_ip, username, timestamp, method, path, status_code, bytes alanlarına ayırabilirsiniz. Kibana’da “son 1 saatte 500 dönen endpoint’ler” sorgusunu yapabilmek için bu ayrımın yapılmış olması şarttır.
Temel Grok Syntax’ı
Grok pattern’larının yapısı şu şekildedir:
%{PATTERN_NAME:field_name}
%{PATTERN_NAME:field_name:data_type}
- PATTERN_NAME: Önceden tanımlı veya kendinizin yazdığı pattern
- field_name: Elasticsearch’e gönderilecek alan adı
- data_type: İsteğe bağlı,
intveyafloatolabilir
Basit bir örnekle başlayalım. Şöyle bir log satırımız olsun:
2023-11-10 14:32:11 ERROR DatabaseConnection: Timeout after 30s
Bu için Grok filtresi:
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:log_level} %{DATA:component}: %{GREEDYDATA:error_message}"
}
}
}
Bu konfigürasyon çalıştığında Elasticsearch’e şu alanlar gider:
- log_timestamp:
2023-11-10 14:32:11 - log_level:
ERROR - component:
DatabaseConnection - error_message:
Timeout after 30s
Sık Kullanılan Built-in Pattern’lar
Logstash ile birlikte yüzlerce hazır pattern gelir. En çok işe yarayanlar:
- %{IP}: IPv4 ve IPv6 adresleri
- %{NUMBER}: Sayısal değerler
- %{WORD}: Boşluksuz tek kelime
- %{DATA}: Tembel eşleşme (mümkün olan en az karakteri alır)
- %{GREEDYDATA}: Açgözlü eşleşme (satır sonuna kadar her şeyi alır)
- %{TIMESTAMP_ISO8601}: ISO 8601 formatında zaman damgası
- %{LOGLEVEL}: DEBUG, INFO, WARN, ERROR, FATAL gibi log seviyeleri
- %{URIPATH}: URL path bileşeni
- %{HTTPD_COMMONLOG}: Apache/Nginx common log formatı için hazır pattern
- %{QS}: Tırnak içindeki string (quoted string)
Nginx Access Log Parse Etmek
Gerçek dünyada en çok karşılaşılan senaryo web sunucu loglarıdır. Bir production ortamında Nginx access log’larını parse eden tam bir Logstash konfigürasyonu:
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb_nginx"
tags => ["nginx", "access"]
}
}
filter {
if "nginx" in [tags] and "access" in [tags] {
grok {
match => {
"message" => "%{IPORHOST:client_ip} - %{DATA:auth_user} [%{HTTPDATE:request_time}] "%{WORD:http_method} %{URIPATHPARAM:request_path} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code:int} %{NUMBER:bytes_sent:int} "%{DATA:http_referrer}" "%{GREEDYDATA:user_agent}""
}
remove_field => ["message"]
}
date {
match => ["request_time", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
mutate {
convert => {
"response_code" => "integer"
"bytes_sent" => "integer"
}
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-access-%{+YYYY.MM.dd}"
}
}
Burada dikkat etmeniz gereken birkaç nokak var. Köşeli parantez ve tırnak işaretleri gibi özel karakterlerin önüne koymak gerekiyor. response_code ve bytes_sent alanlarını integer’a çeviriyoruz çünkü Kibana’da sayısal filtreleme yapabilmek için bu şart.
Özel Pattern Tanımlamak
Bazen hazır pattern’lar yetmez. Şirketin kendi geliştirdiği uygulamaların logları, özel formatlarda gelir. Bu durumda kendi pattern’larınızı tanımlamanız gerekir.
Diyelim ki uygulamanız şöyle loglar üretiyor:
[TXN-20231110-8847263] PaymentService - Processing payment for user_id=45892 amount=1250.00 currency=TRY status=PENDING
Bunun için özel pattern dosyası oluşturun:
# /etc/logstash/patterns/custom_patterns dosyası
TRANSACTION_ID TXN-[0-9]{8}-[0-9]{7}
CURRENCY [A-Z]{3}
PAYMENT_STATUS PENDING|PROCESSING|COMPLETED|FAILED|REFUNDED
Logstash konfigürasyonunda bu pattern’ları kullanmak için:
filter {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => {
"message" => "[%{TRANSACTION_ID:transaction_id}] %{WORD:service_name} - %{DATA:action} user_id=%{NUMBER:user_id:int} amount=%{NUMBER:amount:float} currency=%{CURRENCY:currency} status=%{PAYMENT_STATUS:payment_status}"
}
}
}
Bu pattern tanımları sayesinde transaction_id, service_name, user_id, amount, currency ve payment_status alanları temiz bir şekilde Elasticsearch’e gider. Kibana’da “FAILED status’ündeki ödemeler” veya “TRY cinsinden 1000 TL üzeri işlemler” gibi sorgular anlık çalışır hale gelir.
Birden Fazla Log Formatını Aynı Anda İşlemek
Bir mikro servis ortamında farklı servisler farklı log formatları üretir. Tek bir pipeline’da hepsini yönetmek için match array’i ve break_on_match parametresini kullanabilirsiniz:
filter {
grok {
match => {
"message" => [
"[%{TRANSACTION_ID:transaction_id}] %{WORD:service_name} - %{DATA:action} user_id=%{NUMBER:user_id:int}",
"%{TIMESTAMP_ISO8601:log_timestamp} [%{LOGLEVEL:log_level}] %{JAVACLASS:java_class} - %{GREEDYDATA:log_message}",
"%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{NOTSPACE:request}" %{NUMBER:response:int} (?:%{NUMBER:bytes:int}|-)"
]
}
break_on_match => true
}
}
break_on_match => true (varsayılan değerdir) ilk başarılı eşleşmede durur. Eğer tüm pattern’ların denenmesini istiyorsanız false yapın ama bu performansı etkiler.
Grok Hata Ayıklama: _grokparsefailure
En sinir bozucu durum: konfigürasyonu yazarsınız, Logstash’i başlatırsınız ve Kibana’da _grokparsefailure tag’li kayıtlar görmeye başlarsınız. Bu, Grok’un log satırını parse edemediği anlamına gelir.
Hata ayıklama için önce Grok Debugger kullanın. Kibana’nın Dev Tools bölümünde veya https://grokdebugger.com adresinde pattern’ınızı test edebilirsiniz. Ama production’da gerçek zamanlı debug için şu konfigürasyonu kullanın:
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:log_level} %{GREEDYDATA:log_message}"
}
tag_on_failure => ["_grokparsefailure", "needs_review"]
add_field => {
"parse_status" => "failed"
}
}
if "_grokparsefailure" in [tags] {
mutate {
add_field => {
"raw_message_backup" => "%{message}"
}
}
}
}
output {
if "_grokparsefailure" in [tags] {
file {
path => "/var/log/logstash/parse_failures_%{+YYYY-MM-dd}.log"
codec => line {
format => "%{message}"
}
}
}
elasticsearch {
hosts => ["localhost:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
}
Bu konfigürasyon parse edilemeyen satırları hem Elasticsearch’e hem de ayrı bir dosyaya yazar. Dosyayı inceleyerek pattern’ı neden eşleşmediğini anlayabilirsiniz.
Java Stack Trace’lerini İşlemek
Java uygulamaları çalıştırıyorsanız çok satırlı stack trace logları kaçınılmazdır. Grok tek başına bunu çözemez, multiline codec ile birlikte kullanılması gerekir:
input {
file {
path => "/var/log/app/application.log"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}
filter {
grok {
match => {
"message" => "(?m)%{TIMESTAMP_ISO8601:log_timestamp}s+%{LOGLEVEL:log_level}s+%{JAVACLASS:logger_name}s+-s+%{DATA:log_message}(?:n%{GREEDYDATA:stack_trace})?"
}
}
if [stack_trace] {
mutate {
add_tag => ["has_exception"]
gsub => [
"stack_trace", "t", " "
]
}
}
}
(?m) flag’i regex’in çok satırlı çalışmasını sağlar. negate => true ile TIMESTAMP_ISO8601 ile başlamayan satırlar bir öncekine eklenir. Bu sayede tüm stack trace tek bir Elasticsearch dokümanı olarak saklanır.
Performans Optimizasyonu
Grok regex tabanlı olduğu için yoğuk trafikte CPU darboğazı yaratabilir. Bunu önlemek için:
Pattern Sırasına Dikkat Edin
filter {
grok {
match => {
"message" => [
"%{MOST_COMMON_PATTERN}",
"%{LESS_COMMON_PATTERN}",
"%{RARE_PATTERN}"
]
}
}
}
En sık eşleşen pattern’ı başa koyun. Logstash ilk eşleşmede durur, gereksiz regex işlemi yapmamış olur.
Anchor Kullanın
filter {
grok {
match => {
"message" => "^%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:log_level} %{GREEDYDATA:log_message}$"
}
}
}
^ ve $ anchor’ları regex engine’in gereksiz pozisyonları denemesini önler. Özellikle uzun log satırlarında ciddi performans farkı yaratır.
If Koşullarıyla Ön Filtreleme Yapın
filter {
if [type] == "nginx" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
} else if [type] == "java_app" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:log_level}" }
}
}
}
Her log satırı için her pattern’ı denemek yerine, log tipine göre sadece ilgili Grok bloğunu çalıştırın.
Gerçek Dünya Senaryosu: E-ticaret Log Analizi
Bir e-ticaret şirketinde çalıştığımı varsayalım. Farklı servislerden gelen logları birleştirip anlamlı metrikler çıkarmak istiyoruz. Order service şöyle loglar üretiyor:
2023-11-10T14:32:11.456Z INFO order-service [req-id=abc123] Order created: order_id=ORD-789456 user_id=45892 total=2350.00 item_count=3 warehouse=IST-01
Bu log için kapsamlı bir Grok konfigürasyonu:
filter {
if [fields][service] == "order-service" {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => {
"message" => "%{TIMESTAMP_ISO8601:event_timestamp} %{LOGLEVEL:log_level} %{WORD:service_name} [req-id=%{DATA:request_id}] Order %{WORD:order_action}: order_id=%{DATA:order_id} user_id=%{NUMBER:user_id:int} total=%{NUMBER:order_total:float} item_count=%{NUMBER:item_count:int} warehouse=%{DATA:warehouse_code}"
}
}
mutate {
add_field => {
"event_type" => "order_%{order_action}"
"is_high_value" => "false"
}
}
if [order_total] and [order_total] > 1000 {
mutate {
update => { "is_high_value" => "true" }
add_tag => ["high_value_order"]
}
}
date {
match => ["event_timestamp", "ISO8601"]
target => "@timestamp"
remove_field => ["event_timestamp"]
}
}
}
Bu konfigürasyon sayesinde Kibana’da şu soruların cevabını saniyeler içinde bulabilirsiniz: Bugün kaç sipariş oluşturuldu? Hangi depoda en çok sipariş var? 1000 TL üzeri siparişlerin oranı nedir? Hangi kullanıcılar en çok sipariş veriyor?
Sonuç
Grok filter, ELK stack’in kalbindeki dönüşüm mekanizmasıdır. Ham log satırlarını yapılandırılmış veriye çevirme işinin %80’ini Grok üstlenir. Doğru yazılmış bir Grok konfigürasyonu, on binlerce log satırını Kibana’da anlamlı dashboard’lara dönüştürür.
Pratik öneriler:
- Yeni bir log formatıyla karşılaştığınızda önce Grok Debugger’da test edin, direkt production’a sürmeyin
- Pattern’larınızı ayrı dosyalarda tutun, konfigürasyon dosyasını şişirmeyin
_grokparsefailureoranını Kibana’da izleyin, %5’in üzerine çıkıyorsa pattern’larınızı gözden geçirin- Çok karmaşık tek bir Grok pattern yazmak yerine birden fazla basit adıma bölün, okunabilirlik ve bakım kolaylığı açısından çok daha iyidir
- Java veya .NET uygulamaları için mutlaka
multilinecodec’i konfigürasyona ekleyin, aksi halde stack trace’ler parçalanır
ELK stack’i kuran ama Grok’u yüzeysel geçen ekipler, aslında sistemin en değerli kısmını kullanmamış oluyor. Loglardan gerçek değer üretmek Grok ile başlar.
