Logstash ile Veri Toplama ve Elasticsearch’e Gönderme

Elasticsearch kurulumunu tamamladın, index’leri oluşturdun, cluster sağlıklı çalışıyor. Peki ya veriler? Sunucularından, uygulamalarından, network cihazlarından akan log’ları Elasticsearch’e nasıl aktaracaksın? İşte tam bu noktada Logstash devreye giriyor. ELK Stack’in “L”si olan Logstash, veriyi alır, işler ve istediğin hedefe gönderir. Bu yazıda gerçek dünya senaryoları üzerinden Logstash pipeline’larını kuracak, optimize edecek ve production ortamında kullanılabilir hale getireceğiz.

Logstash Nedir ve Neden Kullanırız

Logstash özünde bir ETL (Extract, Transform, Load) aracıdır. Ham veriyi alır, anlam kazandırır ve hedefine gönderir. Sadece Elasticsearch ile çalışmaz; Kafka’ya, S3’e, dosyaya, hatta başka bir Logstash instance’ına veri gönderebilir. Ama biz bugün Elasticsearch entegrasyonuna odaklanacağız.

Neden Filebeat veya direkt API yerine Logstash kullanıyoruz diye sorabilirsin. Cevap basit: veri dönüşümü. Nginx log’undaki IP adresini GeoIP verisine çevirmek, Apache log satırını parse edip anlamlı field’lara bölmek, hassas verileri maskelemek, birden fazla kaynaktan gelen veriyi normalize etmek. Bunların hepsini Logstash pipeline içinde halledersin.

Kurulum

Debian/Ubuntu üzerinde kuruluma başlayalım. Önce Java gereksinimi:

# Java kurulumu (Logstash 8.x için Java 11 veya 17 gerekli)
sudo apt update
sudo apt install -y openjdk-17-jdk

# Java versiyonunu kontrol et
java -version

# Elastic GPG key ve repository ekle
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg

echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list

# Logstash kur
sudo apt update
sudo apt install -y logstash

# Servisi etkinleştir
sudo systemctl enable logstash
sudo systemctl start logstash

RHEL/CentOS için:

# RPM repository ekle
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

cat > /etc/yum.repos.d/logstash.repo << 'EOF'
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

sudo yum install -y logstash
sudo systemctl enable --now logstash

Kurulum sonrası önemli dizinler:

  • /etc/logstash/: Ana konfigürasyon dizini
  • /etc/logstash/conf.d/: Pipeline konfigürasyon dosyaları buraya gider
  • /var/log/logstash/: Log dosyaları
  • /usr/share/logstash/: Uygulama dosyaları
  • /var/lib/logstash/: Veri ve queue dosyaları

Pipeline Mimarisi: Input, Filter, Output

Her Logstash pipeline üç bölümden oluşur. Bunu bir fabrika bandı gibi düşün: hammadde girer (input), işlenir (filter), ürün çıkar (output).

Temel Pipeline Yapısı

# /etc/logstash/conf.d/basic-pipeline.conf

input {
  beats {
    port => 5044
    host => "0.0.0.0"
  }
}

filter {
  # Boş satırları atla
  if [message] =~ /^s*$/ {
    drop { }
  }

  # Timestamp ekle
  mutate {
    add_field => {
      "pipeline_timestamp" => "%{@timestamp}"
      "logstash_host" => "${HOSTNAME}"
    }
  }
}

output {
  elasticsearch {
    hosts => ["https://localhost:9200"]
    user => "logstash_internal"
    password => "${LOGSTASH_ES_PASSWORD}"
    ssl_certificate_authorities => "/etc/logstash/certs/ca.crt"
    index => "logs-%{+YYYY.MM.dd}"
  }
}

Konfigürasyonu test et:

# Syntax kontrolü
sudo -u logstash /usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/basic-pipeline.conf

# Çıktı başarılıysa şunu görürsün:
# Configuration OK

Gerçek Dünya Senaryosu 1: Nginx Access Log’larını Parse Etmek

En yaygın kullanım senaryolarından biri web sunucu log’larını işlemek. Ham Nginx log satırı şöyle görünür:

192.168.1.100 - john [10/Oct/2024:13:55:36 +0300] "GET /api/users HTTP/1.1" 200 2326 "https://example.com" "Mozilla/5.0..."

Bu satırı anlamlı field’lara bölmek için Grok filter kullanıyoruz:

# /etc/logstash/conf.d/nginx-pipeline.conf

input {
  beats {
    port => 5044
    type => "nginx_access"
  }
  # Alternatif: Doğrudan dosyadan oku
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    type => "nginx_access"
    sincedb_path => "/var/lib/logstash/sincedb_nginx"
  }
}

filter {
  if [type] == "nginx_access" {
    grok {
      match => {
        "message" => "%{COMBINEDAPACHELOG}"
      }
      overwrite => ["message"]
    }

    # Timestamp'i düzgün parse et
    date {
      match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
      remove_field => ["timestamp"]
    }

    # Response code'u integer'a çevir
    mutate {
      convert => {
        "response" => "integer"
        "bytes" => "integer"
      }
    }

    # GeoIP bilgisi ekle
    geoip {
      source => "clientip"
      target => "geoip"
      add_field => {
        "[geoip][location_type]" => "Point"
      }
    }

    # User agent parse et
    useragent {
      source => "agent"
      target => "user_agent"
    }

    # 4xx ve 5xx hataları işaretle
    if [response] >= 400 {
      mutate {
        add_tag => ["error_response"]
        add_field => { "is_error" => "true" }
      }
    }

    # Gereksiz field'ları temizle
    mutate {
      remove_field => ["host", "agent", "input", "log"]
    }
  }
}

output {
  if [type] == "nginx_access" {
    elasticsearch {
      hosts => ["https://es-node1:9200", "https://es-node2:9200"]
      user => "logstash_writer"
      password => "${ES_PASSWORD}"
      ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
      index => "nginx-access-%{+YYYY.MM}"
      # ILM kullanıyorsan
      ilm_rollover_alias => "nginx-access"
      ilm_pattern => "000001"
      ilm_policy => "nginx-logs-policy"
      template_name => "nginx-access-template"
    }
  }
}

Gerçek Dünya Senaryosu 2: Çoklu Kaynak ve Koşullu Yönlendirme

Production ortamında genellikle farklı kaynaklardan veri alıp farklı index’lere yönlendirmen gerekir. Uygulama log’ları, sistem log’ları ve güvenlik log’larının hepsi farklı index’lere gitmeli:

# /etc/logstash/conf.d/multi-source-pipeline.conf

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    ssl_verify_mode => "peer"
    codec => json
  }

  tcp {
    port => 5000
    codec => json_lines
    type => "syslog"
  }

  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topics => ["app-logs", "security-events"]
    group_id => "logstash-consumer-group"
    auto_offset_reset => "earliest"
    codec => json
    consumer_threads => 4
    type => "kafka_input"
  }
}

filter {
  # Beats'ten gelen Filebeat verisi için
  if [@metadata][beat] == "filebeat" {
    if [fields][log_type] == "application" {
      mutate { add_field => { "[@metadata][target_index]" => "app-logs" } }

      # JSON formatındaki application log'larını parse et
      json {
        source => "message"
        target => "app"
        skip_on_invalid_json => true
      }

      # Hata seviyelerine göre tag ekle
      if [app][level] == "ERROR" or [app][level] == "FATAL" {
        mutate { add_tag => ["alert_needed"] }
      }
    }

    if [fields][log_type] == "security" {
      mutate { add_field => { "[@metadata][target_index]" => "security-events" } }

      # SSH brute force tespiti
      if [message] =~ /Failed password/ {
        grok {
          match => {
            "message" => "Failed password for %{USER:failed_user} from %{IP:source_ip}"
          }
        }
        mutate { add_tag => ["brute_force_attempt"] }
        geoip { source => "source_ip" }
      }
    }
  }

  # Syslog için
  if [type] == "syslog" {
    grok {
      match => {
        "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}"
      }
    }
    mutate { add_field => { "[@metadata][target_index]" => "syslog" } }
    date {
      match => ["syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss"]
    }
  }
}

output {
  elasticsearch {
    hosts => ["https://es-master:9200"]
    user => "logstash_writer"
    password => "${ES_PASSWORD}"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
    action => "index"
  }

  # Kritik hataları ayrıca stdout'a yaz (debug için)
  if "alert_needed" in [tags] {
    stdout {
      codec => rubydebug
    }
  }
}

Logstash Güvenlik Ayarları

Production’da güvenlik ihmal edilemez. Özellikle Elasticsearch 8.x ile SSL zorunlu:

# Logstash için özel kullanıcı oluştur (Elasticsearch Kibana'dan veya API ile)
# Kibana'dan: Management > Security > Users

# API ile kullanıcı oluşturma
curl -X POST "https://localhost:9200/_security/user/logstash_writer" 
  -H "Content-Type: application/json" 
  -u elastic:changeme 
  --cacert /etc/elasticsearch/certs/ca.crt 
  -d '{
    "password" : "logstash_strong_password_123!",
    "roles" : ["logstash_writer"],
    "full_name" : "Logstash Writer User"
  }'

# logstash_writer rolünü oluştur
curl -X POST "https://localhost:9200/_security/role/logstash_writer" 
  -H "Content-Type: application/json" 
  -u elastic:changeme 
  --cacert /etc/elasticsearch/certs/ca.crt 
  -d '{
    "cluster": ["monitor", "manage_index_templates", "manage_ilm"],
    "indices": [
      {
        "names": ["logs-*", "nginx-*", "app-*", "security-*", "syslog-*"],
        "privileges": ["write", "create", "create_index", "manage", "auto_configure"]
      }
    ]
  }'

Logstash keystore ile şifreleri güvenli sakla:

# Keystore oluştur
sudo -u logstash /usr/share/logstash/bin/logstash-keystore --path.settings /etc/logstash create

# Şifreleri keystore'a ekle
sudo -u logstash /usr/share/logstash/bin/logstash-keystore --path.settings /etc/logstash add ES_PASSWORD
# Şifreyi interaktif olarak gir

sudo -u logstash /usr/share/logstash/bin/logstash-keystore --path.settings /etc/logstash add LOGSTASH_ES_PASSWORD

# Keystore içeriğini listele
sudo -u logstash /usr/share/logstash/bin/logstash-keystore --path.settings /etc/logstash list

Performance Tuning

Logstash varsayılan ayarlarla production yükünü kaldıramaz. /etc/logstash/logstash.yml dosyasını düzenle:

# /etc/logstash/logstash.yml

# Worker thread sayısı (CPU core sayısına eşit veya 2x)
pipeline.workers: 8

# Her worker'ın batch'te işleyeceği event sayısı
pipeline.batch.size: 500

# Batch için bekleme süresi (ms)
pipeline.batch.delay: 50

# Persistent queue aktif et (veri kaybını önler)
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024
path.queue: /var/lib/logstash/queue

# Dead letter queue (parse edilemeyen event'lar için)
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
path.dead_letter_queue: /var/lib/logstash/dead_letter_queue

# Log seviyesi
log.level: warn

JVM heap ayarları için /etc/logstash/jvm.options:

# /etc/logstash/jvm.options
# Sunucu RAM'inin 4'te 1'i ile yarısı arası iyi bir başlangıç
-Xms2g
-Xmx2g

# G1GC kullan (büyük heap için daha iyi)
-XX:+UseG1GC
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30

Monitoring ve Sorun Giderme

Pipeline sağlığını izlemek için Logstash API’sini kullanabilirsin:

# Pipeline istatistiklerini görüntüle
curl -s http://localhost:9600/_node/stats/pipelines | python3 -m json.tool

# Event işleme hızını kontrol et
curl -s http://localhost:9600/_node/stats | 
  python3 -c "import sys, json; d=json.load(sys.stdin); 
  print('Events in:', d['pipeline']['events']['in']); 
  print('Events out:', d['pipeline']['events']['out']); 
  print('Events filtered:', d['pipeline']['events']['filtered'])"

# Plugin istatistikleri
curl -s http://localhost:9600/_node/stats/pipelines?pretty

# Logstash hot threads (performans sorunu varsa)
curl -s http://localhost:9600/_node/hot_threads

# Logstash canlı log takibi
sudo journalctl -u logstash -f

# Hata loglarını filtrele
sudo grep -i "error|warn|exception" /var/log/logstash/logstash-plain.log | tail -50

Dead letter queue’daki başarısız event’ları işlemek için:

# Dead letter queue'yu monitor et
ls -lh /var/lib/logstash/dead_letter_queue/main/

# DLQ event'larını yeniden işlemek için ayrı pipeline
# /etc/logstash/conf.d/dlq-reprocess.conf
input {
  dead_letter_queue {
    path => "/var/lib/logstash/dead_letter_queue"
    commit_offsets => true
    pipeline_id => "main"
  }
}

filter {
  # Neden başarısız olduğunu logla
  mutate {
    add_field => {
      "dlq_reason" => "%{[@metadata][dead_letter_queue][reason]}"
      "original_plugin" => "%{[@metadata][dead_letter_queue][plugin_type]}"
    }
  }
}

output {
  # Düzeltilmiş veriyi tekrar gönder veya incele
  file {
    path => "/var/log/logstash/dlq-events-%{+YYYY-MM-dd}.log"
    codec => json_lines
  }
}

Multiple Pipeline Konfigürasyonu

Büyük ortamlarda farklı pipeline’ları izole etmek isteyebilirsin. Bu hem resource yönetimi hem de izolasyon açısından önemli:

# /etc/logstash/pipelines.yml

- pipeline.id: nginx-pipeline
  path.config: "/etc/logstash/conf.d/nginx-pipeline.conf"
  pipeline.workers: 4
  pipeline.batch.size: 300
  queue.type: persisted

- pipeline.id: security-pipeline
  path.config: "/etc/logstash/conf.d/security-pipeline.conf"
  pipeline.workers: 2
  pipeline.batch.size: 100
  queue.type: persisted
  queue.max_bytes: 2gb

- pipeline.id: app-logs-pipeline
  path.config: "/etc/logstash/conf.d/app-pipeline.conf"
  pipeline.workers: 6
  pipeline.batch.size: 500
  queue.type: persisted

Grok Pattern Geliştirme

Custom pattern’lar yazmak bazen kaçınılmaz olur. Özellikle kurumsal uygulamaların kendine özgü log formatları için:

# /etc/logstash/patterns/custom-patterns
MYAPP_LOG [%{TIMESTAMP_ISO8601:timestamp}] [%{LOGLEVEL:level}] [%{DATA:thread}] %{JAVACLASS:class} - %{GREEDYDATA:log_message}

CUSTOM_ERROR (?i:error|exception|fail|fatal|critical)

DB_SLOW_QUERY # Time: %{NUMBER:query_time}ss+# Lock: %{NUMBER:lock_time}ss+# Rows_sent: %{NUMBER:rows_sent}s+# Rows_examined: %{NUMBER:rows_examined}

# Pipeline'da kullan
filter {
  grok {
    patterns_dir => ["/etc/logstash/patterns"]
    match => {
      "message" => "%{MYAPP_LOG}"
    }
  }
}

Grok pattern’larını test etmek için Kibana’daki Grok Debugger’ı veya komut satırından:

# Test için minimal pipeline çalıştır
echo '[2024-01-15T10:30:00.000Z] [ERROR] [main-thread] com.example.App - Database connection failed' | 
/usr/share/logstash/bin/logstash 
  --log.level quiet 
  -e 'input { stdin { } }
      filter {
        grok {
          match => { "message" => "[%{TIMESTAMP_ISO8601:ts}] [%{LOGLEVEL:level}] [%{DATA:thread}] %{JAVACLASS:class} - %{GREEDYDATA:msg}" }
        }
      }
      output { stdout { codec => rubydebug } }'

Sonuç

Logstash, doğru kurulduğunda ve configure edildiğinde güçlü bir veri boru hattı oluşturur. Bu yazıda ele aldıklarımızı özetleyelim:

  • Kurulum: Hem Debian/Ubuntu hem RHEL için paket tabanlı kurulum ve temel servis yönetimi
  • Pipeline yapısı: Input, filter, output üçlüsünün nasıl çalıştığı
  • Gerçek senaryolar: Nginx log parse, çoklu kaynak yönetimi ve koşullu yönlendirme
  • Güvenlik: SSL/TLS konfigürasyonu, keystore kullanımı ve minimum yetki prensibine uygun kullanıcı oluşturma
  • Performance: JVM tuning, persistent queue ve batch boyutu optimizasyonu
  • İzleme: API üzerinden pipeline istatistikleri ve dead letter queue yönetimi

Production’a almadan önce birkaç kritik hatırlatma: Persistent queue’yu mutlaka aktif et, veri kaybı yaşarsın. Logstash’e verdiğin heap boyutunu aşırıya kaçırma, JVM GC sorunları yaratır. Elasticsearch’e yazan user için minimum yetki prensibini uygula. Pipeline’ları bölümle, bir pipeline çökerse diğerleri etkilenmesin.

Bir sonraki adım olarak Kibana’da Logstash pipeline monitoring’i aktif etmeni ve Index Lifecycle Management politikalarını Logstash output’larıyla entegre etmeni öneririm. ELK Stack’in gücü bu üç bileşenin bir arada iyi çalışmasından geliyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir