Logstash ile Veri Toplama ve Elasticsearch’e Gönderme
Elasticsearch kurulumunu tamamladın, index’leri oluşturdun, cluster sağlıklı çalışıyor. Peki ya veriler? Sunucularından, uygulamalarından, network cihazlarından akan log’ları Elasticsearch’e nasıl aktaracaksın? İşte tam bu noktada Logstash devreye giriyor. ELK Stack’in “L”si olan Logstash, veriyi alır, işler ve istediğin hedefe gönderir. Bu yazıda gerçek dünya senaryoları üzerinden Logstash pipeline’larını kuracak, optimize edecek ve production ortamında kullanılabilir hale getireceğiz.
Logstash Nedir ve Neden Kullanırız
Logstash özünde bir ETL (Extract, Transform, Load) aracıdır. Ham veriyi alır, anlam kazandırır ve hedefine gönderir. Sadece Elasticsearch ile çalışmaz; Kafka’ya, S3’e, dosyaya, hatta başka bir Logstash instance’ına veri gönderebilir. Ama biz bugün Elasticsearch entegrasyonuna odaklanacağız.
Neden Filebeat veya direkt API yerine Logstash kullanıyoruz diye sorabilirsin. Cevap basit: veri dönüşümü. Nginx log’undaki IP adresini GeoIP verisine çevirmek, Apache log satırını parse edip anlamlı field’lara bölmek, hassas verileri maskelemek, birden fazla kaynaktan gelen veriyi normalize etmek. Bunların hepsini Logstash pipeline içinde halledersin.
Kurulum
Debian/Ubuntu üzerinde kuruluma başlayalım. Önce Java gereksinimi:
# Java kurulumu (Logstash 8.x için Java 11 veya 17 gerekli)
sudo apt update
sudo apt install -y openjdk-17-jdk
# Java versiyonunu kontrol et
java -version
# Elastic GPG key ve repository ekle
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
# Logstash kur
sudo apt update
sudo apt install -y logstash
# Servisi etkinleştir
sudo systemctl enable logstash
sudo systemctl start logstash
RHEL/CentOS için:
# RPM repository ekle
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/logstash.repo << 'EOF'
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
sudo yum install -y logstash
sudo systemctl enable --now logstash
Kurulum sonrası önemli dizinler:
- /etc/logstash/: Ana konfigürasyon dizini
- /etc/logstash/conf.d/: Pipeline konfigürasyon dosyaları buraya gider
- /var/log/logstash/: Log dosyaları
- /usr/share/logstash/: Uygulama dosyaları
- /var/lib/logstash/: Veri ve queue dosyaları
Pipeline Mimarisi: Input, Filter, Output
Her Logstash pipeline üç bölümden oluşur. Bunu bir fabrika bandı gibi düşün: hammadde girer (input), işlenir (filter), ürün çıkar (output).
Temel Pipeline Yapısı
# /etc/logstash/conf.d/basic-pipeline.conf
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
# Boş satırları atla
if [message] =~ /^s*$/ {
drop { }
}
# Timestamp ekle
mutate {
add_field => {
"pipeline_timestamp" => "%{@timestamp}"
"logstash_host" => "${HOSTNAME}"
}
}
}
output {
elasticsearch {
hosts => ["https://localhost:9200"]
user => "logstash_internal"
password => "${LOGSTASH_ES_PASSWORD}"
ssl_certificate_authorities => "/etc/logstash/certs/ca.crt"
index => "logs-%{+YYYY.MM.dd}"
}
}
Konfigürasyonu test et:
# Syntax kontrolü
sudo -u logstash /usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/basic-pipeline.conf
# Çıktı başarılıysa şunu görürsün:
# Configuration OK
Gerçek Dünya Senaryosu 1: Nginx Access Log’larını Parse Etmek
En yaygın kullanım senaryolarından biri web sunucu log’larını işlemek. Ham Nginx log satırı şöyle görünür:
192.168.1.100 - john [10/Oct/2024:13:55:36 +0300] "GET /api/users HTTP/1.1" 200 2326 "https://example.com" "Mozilla/5.0..."
Bu satırı anlamlı field’lara bölmek için Grok filter kullanıyoruz:
# /etc/logstash/conf.d/nginx-pipeline.conf
input {
beats {
port => 5044
type => "nginx_access"
}
# Alternatif: Doğrudan dosyadan oku
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
type => "nginx_access"
sincedb_path => "/var/lib/logstash/sincedb_nginx"
}
}
filter {
if [type] == "nginx_access" {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
overwrite => ["message"]
}
# Timestamp'i düzgün parse et
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
remove_field => ["timestamp"]
}
# Response code'u integer'a çevir
mutate {
convert => {
"response" => "integer"
"bytes" => "integer"
}
}
# GeoIP bilgisi ekle
geoip {
source => "clientip"
target => "geoip"
add_field => {
"[geoip][location_type]" => "Point"
}
}
# User agent parse et
useragent {
source => "agent"
target => "user_agent"
}
# 4xx ve 5xx hataları işaretle
if [response] >= 400 {
mutate {
add_tag => ["error_response"]
add_field => { "is_error" => "true" }
}
}
# Gereksiz field'ları temizle
mutate {
remove_field => ["host", "agent", "input", "log"]
}
}
}
output {
if [type] == "nginx_access" {
elasticsearch {
hosts => ["https://es-node1:9200", "https://es-node2:9200"]
user => "logstash_writer"
password => "${ES_PASSWORD}"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
index => "nginx-access-%{+YYYY.MM}"
# ILM kullanıyorsan
ilm_rollover_alias => "nginx-access"
ilm_pattern => "000001"
ilm_policy => "nginx-logs-policy"
template_name => "nginx-access-template"
}
}
}
Gerçek Dünya Senaryosu 2: Çoklu Kaynak ve Koşullu Yönlendirme
Production ortamında genellikle farklı kaynaklardan veri alıp farklı index’lere yönlendirmen gerekir. Uygulama log’ları, sistem log’ları ve güvenlik log’larının hepsi farklı index’lere gitmeli:
# /etc/logstash/conf.d/multi-source-pipeline.conf
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
ssl_verify_mode => "peer"
codec => json
}
tcp {
port => 5000
codec => json_lines
type => "syslog"
}
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topics => ["app-logs", "security-events"]
group_id => "logstash-consumer-group"
auto_offset_reset => "earliest"
codec => json
consumer_threads => 4
type => "kafka_input"
}
}
filter {
# Beats'ten gelen Filebeat verisi için
if [@metadata][beat] == "filebeat" {
if [fields][log_type] == "application" {
mutate { add_field => { "[@metadata][target_index]" => "app-logs" } }
# JSON formatındaki application log'larını parse et
json {
source => "message"
target => "app"
skip_on_invalid_json => true
}
# Hata seviyelerine göre tag ekle
if [app][level] == "ERROR" or [app][level] == "FATAL" {
mutate { add_tag => ["alert_needed"] }
}
}
if [fields][log_type] == "security" {
mutate { add_field => { "[@metadata][target_index]" => "security-events" } }
# SSH brute force tespiti
if [message] =~ /Failed password/ {
grok {
match => {
"message" => "Failed password for %{USER:failed_user} from %{IP:source_ip}"
}
}
mutate { add_tag => ["brute_force_attempt"] }
geoip { source => "source_ip" }
}
}
}
# Syslog için
if [type] == "syslog" {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}"
}
}
mutate { add_field => { "[@metadata][target_index]" => "syslog" } }
date {
match => ["syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss"]
}
}
}
output {
elasticsearch {
hosts => ["https://es-master:9200"]
user => "logstash_writer"
password => "${ES_PASSWORD}"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
action => "index"
}
# Kritik hataları ayrıca stdout'a yaz (debug için)
if "alert_needed" in [tags] {
stdout {
codec => rubydebug
}
}
}
Logstash Güvenlik Ayarları
Production’da güvenlik ihmal edilemez. Özellikle Elasticsearch 8.x ile SSL zorunlu:
# Logstash için özel kullanıcı oluştur (Elasticsearch Kibana'dan veya API ile)
# Kibana'dan: Management > Security > Users
# API ile kullanıcı oluşturma
curl -X POST "https://localhost:9200/_security/user/logstash_writer"
-H "Content-Type: application/json"
-u elastic:changeme
--cacert /etc/elasticsearch/certs/ca.crt
-d '{
"password" : "logstash_strong_password_123!",
"roles" : ["logstash_writer"],
"full_name" : "Logstash Writer User"
}'
# logstash_writer rolünü oluştur
curl -X POST "https://localhost:9200/_security/role/logstash_writer"
-H "Content-Type: application/json"
-u elastic:changeme
--cacert /etc/elasticsearch/certs/ca.crt
-d '{
"cluster": ["monitor", "manage_index_templates", "manage_ilm"],
"indices": [
{
"names": ["logs-*", "nginx-*", "app-*", "security-*", "syslog-*"],
"privileges": ["write", "create", "create_index", "manage", "auto_configure"]
}
]
}'
Logstash keystore ile şifreleri güvenli sakla:
# Keystore oluştur
sudo -u logstash /usr/share/logstash/bin/logstash-keystore --path.settings /etc/logstash create
# Şifreleri keystore'a ekle
sudo -u logstash /usr/share/logstash/bin/logstash-keystore --path.settings /etc/logstash add ES_PASSWORD
# Şifreyi interaktif olarak gir
sudo -u logstash /usr/share/logstash/bin/logstash-keystore --path.settings /etc/logstash add LOGSTASH_ES_PASSWORD
# Keystore içeriğini listele
sudo -u logstash /usr/share/logstash/bin/logstash-keystore --path.settings /etc/logstash list
Performance Tuning
Logstash varsayılan ayarlarla production yükünü kaldıramaz. /etc/logstash/logstash.yml dosyasını düzenle:
# /etc/logstash/logstash.yml
# Worker thread sayısı (CPU core sayısına eşit veya 2x)
pipeline.workers: 8
# Her worker'ın batch'te işleyeceği event sayısı
pipeline.batch.size: 500
# Batch için bekleme süresi (ms)
pipeline.batch.delay: 50
# Persistent queue aktif et (veri kaybını önler)
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024
path.queue: /var/lib/logstash/queue
# Dead letter queue (parse edilemeyen event'lar için)
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
path.dead_letter_queue: /var/lib/logstash/dead_letter_queue
# Log seviyesi
log.level: warn
JVM heap ayarları için /etc/logstash/jvm.options:
# /etc/logstash/jvm.options
# Sunucu RAM'inin 4'te 1'i ile yarısı arası iyi bir başlangıç
-Xms2g
-Xmx2g
# G1GC kullan (büyük heap için daha iyi)
-XX:+UseG1GC
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30
Monitoring ve Sorun Giderme
Pipeline sağlığını izlemek için Logstash API’sini kullanabilirsin:
# Pipeline istatistiklerini görüntüle
curl -s http://localhost:9600/_node/stats/pipelines | python3 -m json.tool
# Event işleme hızını kontrol et
curl -s http://localhost:9600/_node/stats |
python3 -c "import sys, json; d=json.load(sys.stdin);
print('Events in:', d['pipeline']['events']['in']);
print('Events out:', d['pipeline']['events']['out']);
print('Events filtered:', d['pipeline']['events']['filtered'])"
# Plugin istatistikleri
curl -s http://localhost:9600/_node/stats/pipelines?pretty
# Logstash hot threads (performans sorunu varsa)
curl -s http://localhost:9600/_node/hot_threads
# Logstash canlı log takibi
sudo journalctl -u logstash -f
# Hata loglarını filtrele
sudo grep -i "error|warn|exception" /var/log/logstash/logstash-plain.log | tail -50
Dead letter queue’daki başarısız event’ları işlemek için:
# Dead letter queue'yu monitor et
ls -lh /var/lib/logstash/dead_letter_queue/main/
# DLQ event'larını yeniden işlemek için ayrı pipeline
# /etc/logstash/conf.d/dlq-reprocess.conf
input {
dead_letter_queue {
path => "/var/lib/logstash/dead_letter_queue"
commit_offsets => true
pipeline_id => "main"
}
}
filter {
# Neden başarısız olduğunu logla
mutate {
add_field => {
"dlq_reason" => "%{[@metadata][dead_letter_queue][reason]}"
"original_plugin" => "%{[@metadata][dead_letter_queue][plugin_type]}"
}
}
}
output {
# Düzeltilmiş veriyi tekrar gönder veya incele
file {
path => "/var/log/logstash/dlq-events-%{+YYYY-MM-dd}.log"
codec => json_lines
}
}
Multiple Pipeline Konfigürasyonu
Büyük ortamlarda farklı pipeline’ları izole etmek isteyebilirsin. Bu hem resource yönetimi hem de izolasyon açısından önemli:
# /etc/logstash/pipelines.yml
- pipeline.id: nginx-pipeline
path.config: "/etc/logstash/conf.d/nginx-pipeline.conf"
pipeline.workers: 4
pipeline.batch.size: 300
queue.type: persisted
- pipeline.id: security-pipeline
path.config: "/etc/logstash/conf.d/security-pipeline.conf"
pipeline.workers: 2
pipeline.batch.size: 100
queue.type: persisted
queue.max_bytes: 2gb
- pipeline.id: app-logs-pipeline
path.config: "/etc/logstash/conf.d/app-pipeline.conf"
pipeline.workers: 6
pipeline.batch.size: 500
queue.type: persisted
Grok Pattern Geliştirme
Custom pattern’lar yazmak bazen kaçınılmaz olur. Özellikle kurumsal uygulamaların kendine özgü log formatları için:
# /etc/logstash/patterns/custom-patterns
MYAPP_LOG [%{TIMESTAMP_ISO8601:timestamp}] [%{LOGLEVEL:level}] [%{DATA:thread}] %{JAVACLASS:class} - %{GREEDYDATA:log_message}
CUSTOM_ERROR (?i:error|exception|fail|fatal|critical)
DB_SLOW_QUERY # Time: %{NUMBER:query_time}ss+# Lock: %{NUMBER:lock_time}ss+# Rows_sent: %{NUMBER:rows_sent}s+# Rows_examined: %{NUMBER:rows_examined}
# Pipeline'da kullan
filter {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => {
"message" => "%{MYAPP_LOG}"
}
}
}
Grok pattern’larını test etmek için Kibana’daki Grok Debugger’ı veya komut satırından:
# Test için minimal pipeline çalıştır
echo '[2024-01-15T10:30:00.000Z] [ERROR] [main-thread] com.example.App - Database connection failed' |
/usr/share/logstash/bin/logstash
--log.level quiet
-e 'input { stdin { } }
filter {
grok {
match => { "message" => "[%{TIMESTAMP_ISO8601:ts}] [%{LOGLEVEL:level}] [%{DATA:thread}] %{JAVACLASS:class} - %{GREEDYDATA:msg}" }
}
}
output { stdout { codec => rubydebug } }'
Sonuç
Logstash, doğru kurulduğunda ve configure edildiğinde güçlü bir veri boru hattı oluşturur. Bu yazıda ele aldıklarımızı özetleyelim:
- Kurulum: Hem Debian/Ubuntu hem RHEL için paket tabanlı kurulum ve temel servis yönetimi
- Pipeline yapısı: Input, filter, output üçlüsünün nasıl çalıştığı
- Gerçek senaryolar: Nginx log parse, çoklu kaynak yönetimi ve koşullu yönlendirme
- Güvenlik: SSL/TLS konfigürasyonu, keystore kullanımı ve minimum yetki prensibine uygun kullanıcı oluşturma
- Performance: JVM tuning, persistent queue ve batch boyutu optimizasyonu
- İzleme: API üzerinden pipeline istatistikleri ve dead letter queue yönetimi
Production’a almadan önce birkaç kritik hatırlatma: Persistent queue’yu mutlaka aktif et, veri kaybı yaşarsın. Logstash’e verdiğin heap boyutunu aşırıya kaçırma, JVM GC sorunları yaratır. Elasticsearch’e yazan user için minimum yetki prensibini uygula. Pipeline’ları bölümle, bir pipeline çökerse diğerleri etkilenmesin.
Bir sonraki adım olarak Kibana’da Logstash pipeline monitoring’i aktif etmeni ve Index Lifecycle Management politikalarını Logstash output’larıyla entegre etmeni öneririm. ELK Stack’in gücü bu üç bileşenin bir arada iyi çalışmasından geliyor.
