Logstash’te Redis ve Kafka ile Buffer Kullanımı

Prodüksiyonda yüksek trafikli bir ortamda Logstash çalıştırıyorsanız, büyük ihtimalle şu sahneyle tanışmışsınızdır: Log kaynakları patlar, Logstash pipeline’ı tıkanır, Elasticsearch indexleme ayak uyduramaz ve sonunda ya loglar düşmeye başlar ya da Logstash kendini kapatır. Ben bu durumu ilk kez 3 yıl önce, Black Friday döneminde e-ticaret altyapısında yaşadım. O geceden sonra buffer stratejisi benim için opsiyonel olmaktan çıktı.

Bu yazıda Redis ve Kafka’yı Logstash pipeline’larında nasıl buffer olarak kullanacağınızı, hangi senaryoda hangisinin daha uygun olduğunu ve gerçek dünya konfigürasyonlarını aktaracağım.

Neden Buffer Gerekiyor?

Logstash’in varsayılan davranışı şu şekilde çalışır: input alır, filter uygular, output’a gönderir. Bu pipeline doğrusal ve senkronize. Yani output tarafında bir yavaşlama olduğunda (Elasticsearch’ün disk I/O’su dolduğunda veya cluster rebalancing yaparken), bu gecikme input tarafına kadar yansır.

Backpressure meselesi burada devreye giriyor. Filebeat veya diğer shipper’lar Logstash’e bağlandığında, Logstash onları “dur, yavaşla” diyerek geri basınç uygular. Küçük ölçekte bu yönetilebilir. Ama binlerce sunucudan saniyede onbinlerce log satırı geliyorsa, bu mekanizma yetersiz kalır.

Bir de Logstash’in kendi in-memory queue’su var. Varsayılan olarak bu queue oldukça küçük ve Logstash process’i öldüğünde kaybolur. Persistent queue özelliği var ama o da kendi kısıtlarını getiriyor.

Buffer katmanı eklemek bu problemi şu şekilde çözer:

  • Shipper’lar buffer’a yazar, Logstash buffer’dan okur
  • Logstash yavaşladığında veya kapandığında mesajlar buffer’da bekler
  • Logstash tekrar ayağa kalktığında kaldığı yerden devam eder
  • Input ve output hızları birbirinden bağımsız hale gelir

Redis Buffer: Basit, Hızlı, Etkili

Redis, özellikle orta ölçekli ortamlar için mükemmel bir buffer çözümü. Kurulumu kolay, operasyonel yükü düşük ve Logstash’in native desteği var.

Redis Kurulumu ve Temel Konfigürasyon

Öncelikle Redis’i dedicated bir node’da çalıştırmanızı öneririm. Logstash ile aynı sunucuya koymak, kaynak çekişmesi yaratır.

# Redis kurulumu (RHEL/CentOS 8)
dnf install -y redis
systemctl enable redis
systemctl start redis

# Redis konfigürasyonu - /etc/redis/redis.conf
# Bind sadece internal IP'ye
bind 0.0.0.0
protected-mode yes

# Memory limiti koy, yoksa sistem belleğini yer
maxmemory 4gb
maxmemory-policy allkeys-lru

# Persistence için RDB snapshot
save 900 1
save 300 10
save 60 10000

# AOF aktif et (daha güvenli)
appendonly yes
appendfsync everysec

Redis’i buffer olarak kullanırken kritik bir karar: list mi kullanacaksınız, channel mi? Logstash Redis input/output plugin’i list üzerinden çalışır ve bu production için doğru tercih. Channel (pub/sub) at-most-once semantiğine sahip, list ise siz alana kadar veriyi tutar.

Logstash Redis Output Konfigürasyonu

Filebeat veya diğer kaynaklar doğrudan Logstash’e değil, Redis’e yazacak. Bunun için önce Logstash’i Redis output ile konfigüre ederiz. Ama asıl senaryo şu: log kaynakları -> Redis -> Logstash -> Elasticsearch.

Filebeat tarafını geçelim, Logstash’in Redis’ten okumasını konfigüre edelim:

# /etc/logstash/conf.d/01-redis-input.conf
input {
  redis {
    host => "redis-buffer.internal"
    port => 6379
    password => "guclu-bir-sifre"
    data_type => "list"
    key => "logstash:buffer"
    batch_count => 125
    threads => 2
    codec => json
  }
}

batch_count: Tek seferde kaç mesaj çekeceğini belirler. Elasticsearch’e bulk indexing yapıyorsanız bu değeri pipeline batch size ile uyumlu tutun.

threads: Redis input için birden fazla thread açabilirsiniz. Ama dikkat: aynı key’i birden fazla thread okuyorsa, mesajlar sırayla gelmeyebilir. Sıra önemliyse tek thread kullanın.

Filebeat’ten Redis’e Yazmak

Filebeat’i doğrudan Redis’e yazmak için output konfigürasyonu:

# /etc/filebeat/filebeat.yml
output.redis:
  hosts: ["redis-buffer.internal:6379"]
  password: "guclu-bir-sifre"
  key: "logstash:buffer"
  db: 0
  timeout: 5
  worker: 2
  loadbalance: true
  # Birden fazla Redis varsa
  # hosts: ["redis1:6379", "redis2:6379"]

Bu yapılandırmada Filebeat, Redis listesine RPUSH yapar, Logstash BLPOP ile kuyruktan çeker. Klasik producer-consumer pattern.

Redis Sentinel ile High Availability

Tek Redis node’u single point of failure. Production’da Redis Sentinel veya Cluster kullanın:

# Sentinel konfigürasyonu - /etc/redis/sentinel.conf
sentinel monitor mymaster redis-master.internal 6379 2
sentinel auth-pass mymaster guclu-bir-sifre
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1

# Logstash konfigürasyonunda Sentinel kullanımı
# /etc/logstash/conf.d/01-redis-sentinel-input.conf
input {
  redis {
    host => "redis-sentinel-1.internal"
    port => 26379
    password => "guclu-bir-sifre"
    data_type => "list"
    key => "logstash:buffer"
    # Sentinel için ayrı bir sentinel_port parametresi yok
    # Logstash redis plugin doğrudan sentinel desteklemiyor
    # Bu durumda HAProxy veya Twemproxy önüne koyun
    batch_count => 125
    codec => json
  }
}

Logstash’in Redis plugin’i native Sentinel desteği sunmuyor. Bu noktada önüne HAProxy koyarak aktif node’u expose etmek en pratik çözüm.

Kafka Buffer: Yüksek Hacim, Yüksek Güvenilirlik

Redis güzel ama onbinlerce sunucu, petabyte-ölçekli log altyapısı veya birden fazla consumer (farklı log pipeline’ları) söz konusuysa Kafka tartışmasız kazanır.

Kafka’nın log management için öne çıkardığı özellikler şunlar:

  • Consumer group’lar sayesinde aynı log stream’ini birden fazla sistem okuyabilir
  • Retention policy ile loglar disk’te tutulur, Logstash crash etse bile kayıp olmaz
  • Partition’lar sayesinde paralel okuma/yazma
  • Replication factor ile veri güvenliği

Kafka Cluster Temel Kurulumu

# Kafka kurulumu için önce Java gerekli
dnf install -y java-11-openjdk-devel

# Kafka binary indir ve çıkart
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz -C /opt/
mv /opt/kafka_2.13-3.6.0 /opt/kafka

# Topic oluşturma - partition sayısı Logstash consumer sayısıyla eşleşmeli
/opt/kafka/bin/kafka-topics.sh 
  --create 
  --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 
  --replication-factor 3 
  --partitions 6 
  --topic logs.application 
  --config retention.ms=86400000 
  --config retention.bytes=10737418240

# Topic durumunu kontrol et
/opt/kafka/bin/kafka-topics.sh 
  --describe 
  --bootstrap-server kafka1:9092 
  --topic logs.application

Retention ayarları kritik: retention.ms=86400000 bir günlük retention demek. Logstash 1 gün çevrimdışı kalsa bile loglar kaybolmaz. retention.bytes ise partition başına maksimum disk kullanımı.

Logstash Kafka Input Konfigürasyonu

# /etc/logstash/conf.d/01-kafka-input.conf
input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
    topics => ["logs.application", "logs.nginx", "logs.system"]
    group_id => "logstash-production"
    consumer_threads => 6
    codec => json
    auto_offset_reset => "latest"
    enable_auto_commit => true
    auto_commit_interval_ms => "5000"
    session_timeout_ms => "30000"
    request_timeout_ms => "40000"
    max_poll_records => "500"
    fetch_max_bytes => "52428800"
    
    # SSL/TLS konfigürasyonu
    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    jaas_path => "/etc/logstash/kafka_jaas.conf"
    ssl_truststore_location => "/etc/logstash/kafka.truststore.jks"
    ssl_truststore_password => "truststore-sifresi"
  }
}

consumer_threads ile partition sayısı arasındaki ilişkiyi anlamak önemli. Kafka’da bir partition aynı anda sadece bir consumer tarafından okunabilir. 6 partition varsa maksimum 6 thread işe yarar. 6’dan fazla thread açarsanız, fazladan thread’ler boşta bekler.

auto_offset_reset: latest yeni başlayan consumer’ların sadece yeni mesajları okuması demek. earliest ise topic başından okuma. Production’da yeni bir Logstash instance’ı ayağa kalkıyorsa latest kullanın, yoksa eski tüm logları yeniden işlersiniz.

Filebeat’ten Kafka’ya Yazmak

# /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/application/*.log
  fields:
    log_type: application
    environment: production
  fields_under_root: true
  multiline.pattern: '^d{4}-d{2}-d{2}'
  multiline.negate: true
  multiline.match: after

output.kafka:
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
  topic: 'logs.%{[log_type]}'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
  worker: 4
  bulk_max_size: 2048
  timeout: 30s
  broker_timeout: 10s

required_acks: 1 broker’ın mesajı aldığını onaylaması demek. required_acks: -1 (all) ise tüm replica’ların onaylaması. Veri güvenliği açısından -1 daha iyi ama latency artar. Log şipperı için 1 genellikle yeterli.

Logstash Filter Pipeline

Buffer kullanırken filter pipeline’ı da iyi kurgularsanız, throughput üzerinde büyük etkisi olur:

# /etc/logstash/conf.d/02-filters.conf
filter {
  # Log tipine göre ayrıştırma
  if [log_type] == "nginx" {
    grok {
      match => {
        "message" => '%{IPORHOST:client_ip} - %{DATA:user} [%{HTTPDATE:timestamp}] "%{WORD:method} %{DATA:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status_code} %{NUMBER:bytes_sent}'
      }
      remove_field => ["message"]
    }
    date {
      match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    mutate {
      convert => {
        "status_code" => "integer"
        "bytes_sent" => "integer"
      }
    }
  }

  if [log_type] == "application" {
    json {
      source => "message"
      target => "app"
    }
    mutate {
      remove_field => ["message", "agent", "ecs"]
    }
  }
  
  # Gürültülü logları düşür
  if [status_code] == 200 and [request] =~ "/health" {
    drop {}
  }
}

Redis mi Kafka mı? Karar Kriterleri

Bu sorunun net bir cevabı yok ama deneyimden gelen bazı yönlendirmeler var:

Redis’i tercih edin:

  • Günlük 50 GB’ın altında log hacminiz varsa
  • Ekibinizin Kafka operasyon deneyimi yoksa
  • Hızlı kurulum ve düşük operasyonel yük öncelikliyse
  • Tek bir Logstash cluster’ınız varsa
  • Proof of concept veya orta ölçekli prodüksiyon ortamı

Kafka’yı tercih edin:

  • Günlük 500 GB üzeri log hacmi söz konusuysa
  • Birden fazla downstream consumer (SIEM, veri ambarı, stream processing) varsa
  • Log replay veya reprocessing gerekebiliyorsa
  • Strict at-least-once veya exactly-once semantiği gerekiyorsa
  • Zaten Kafka altyapınız varsa (ek maliyet minimal olur)
  • Multi-datacenter log aggregation yapıyorsanız

Monitoring: Buffer Sağlığını Takip Etmek

Buffer koydunuz ama dolmaya başladığında haberdar olmazsanız, buffer’ın varlığının pek anlamı yok.

Redis İzleme

# Redis queue derinliğini izlemek için script
# /usr/local/bin/check_redis_queue.sh

#!/bin/bash
REDIS_HOST="redis-buffer.internal"
REDIS_PORT="6379"
REDIS_AUTH="guclu-bir-sifre"
KEY="logstash:buffer"
WARN_THRESHOLD=100000
CRIT_THRESHOLD=500000

QUEUE_DEPTH=$(redis-cli -h $REDIS_HOST -p $REDIS_PORT -a $REDIS_AUTH LLEN $KEY 2>/dev/null)

if [ -z "$QUEUE_DEPTH" ]; then
  echo "UNKNOWN: Redis'e baglanamadi"
  exit 3
fi

if [ "$QUEUE_DEPTH" -gt "$CRIT_THRESHOLD" ]; then
  echo "CRITICAL: Queue derinligi $QUEUE_DEPTH (limit: $CRIT_THRESHOLD)"
  exit 2
elif [ "$QUEUE_DEPTH" -gt "$WARN_THRESHOLD" ]; then
  echo "WARNING: Queue derinligi $QUEUE_DEPTH (limit: $WARN_THRESHOLD)"
  exit 1
else
  echo "OK: Queue derinligi $QUEUE_DEPTH"
  exit 0
fi

Kafka Consumer Lag İzleme

# Consumer lag kontrolü - bu değer büyürse Logstash yavaş tüketiyor demek
/opt/kafka/bin/kafka-consumer-groups.sh 
  --bootstrap-server kafka1:9092 
  --group logstash-production 
  --describe

# Çıktı formatı:
# GROUP                  TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# logstash-production    logs.application   0          1234567         1234890         323

# Prometheus için kafka_exporter veya Burrow kullanın
# Basit bir lag monitoring script:
#!/bin/bash
LAG=$(/opt/kafka/bin/kafka-consumer-groups.sh 
  --bootstrap-server kafka1:9092 
  --group logstash-production 
  --describe 2>/dev/null | 
  awk 'NR>1 {sum += $6} END {print sum}')

echo "Toplam consumer lag: $LAG"

if [ "$LAG" -gt 1000000 ]; then
  echo "KRITIK: Logstash tüketim kapasitesinin çok gerisinde!"
fi

Logstash Pipeline Metrikleri

# Logstash API üzerinden pipeline metriklerini çekme
curl -s http://localhost:9600/_node/stats/pipelines | python3 -m json.tool | grep -E '"events"|"duration"'

# Özellikle şu değerlere bakın:
# events.in - input event sayısı
# events.out - output event sayısı  
# events.filtered - filtrelenen event sayısı
# pipeline.batch_size - batch başına event sayısı

# Bu değerleri Grafana'ya göndermek için:
curl -s http://localhost:9600/_node/stats | 
  python3 -c "
import json, sys
stats = json.load(sys.stdin)
events = stats['pipeline']['events']
print(f'Logstash - Gelen: {events["in"]}, Giden: {events["out"]}, Filtreli: {events["filtered"]}')
"

Pratik Optimizasyon Notları

Yıllarca bu stack’le uğraştıktan sonra birikmiş birkaç not:

Logstash pipeline.workers ve batch_size ayarı: Bu ikisi throughput üzerinde en çok etkili parametreler.

# /etc/logstash/logstash.yml
pipeline.workers: 8          # CPU core sayısı kadar veya 2 katı
pipeline.batch.size: 500     # Elasticsearch bulk size ile uyumlu tut
pipeline.batch.delay: 50     # ms - batch tamamlanmayı bekleme süresi

JVM heap ayarı: Logstash hafıza açısından aç bir uygulama. Sunucu RAM’inin yarısını verin ama 32 GB’ı geçmeyin.

# /etc/logstash/jvm.options dosyasında:
-Xms8g
-Xmx8g

Dead letter queue: İşlenemeyen event’lar için mutlaka aktif edin. Yoksa bozuk formatlar pipeline’ı tıkayabilir.

# /etc/logstash/logstash.yml
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1024mb
path.dead_letter_queue: /var/log/logstash/dlq

Sonuç

Buffer katmanı, Logstash altyapısının en çok göz ardı edilen ama en kritik parçalarından biri. Redis ile başlayın, büyüdükçe Kafka’ya geçiş yapın. Önemli olan şu: buffer koymak problemi çözmez, sadece size sorunla baş etmek için zaman kazandırır.

Elasticsearch yavaş mı? Logstash filtreleri verimsiz mi? Bunları buffer arkasına saklayabilirsiniz ama eninde sonunda çözmeniz gerekir. Buffer’ın değeri burada: kriz anında sistemi ayakta tutmak ve gece 3’te alarm almak yerine sabah soğukkanlılıkla problemi analiz etmek.

Consumer lag ve queue derinliği metriklerini alerting sistemine mutlaka ekleyin. Bu değerler büyümeye başladığında pipeline kapasitenizi artırmanız ya da downstream sistemlerinizi optimize etmeniz gerektiğinin erken uyarısıdır. Buffer dolduğunda artık uyarı için geç kalmış olabilirsiniz.

Son olarak: senaryonuzu test edin. Elasticsearch’ü simüle edilmiş yük altında durdurun, buffer’ın devreye girdiğini ve Elasticsearch geri geldiğinde hiçbir log kaybı olmadan devam ettiğini gözlemleyin. Bu testi hiç yapmadan prodüksiyon güvenilirliğine güvenmek, yedeklerini hiç test etmeden backup sahibi olduğunu sanmak gibidir.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir