AWS Athena ile S3 Üzerindeki Verileri SQL ile Sorgulama
Veri miktarı her geçen gün katlanarak büyüyor ve bu veriler çoğunlukla S3 bucket’larında ham halde bekliyor. Geleneksel yaklaşımla bu verileri sorgulamak için bir veritabanına aktarmak, ETL pipeline’ları kurmak ve ciddi bir altyapı maliyetine katlanmak gerekiyordu. AWS Athena tam burada devreye giriyor: S3’teki verilerinizi olduğu yerde SQL ile sorgulayabiliyorsunuz, herhangi bir sunucu kurmadan, herhangi bir cluster yönetmeden. Sysadmin olarak bakıldığında bu çok güçlü bir araç, çünkü log analizinden maliyet raporlarına kadar pek çok operasyonel problemi anında çözebiliyorsunuz.
Athena Nedir ve Nasıl Çalışır
Athena, Apache Presto üzerine inşa edilmiş serverless bir sorgu servisidir. Siz SQL yazarsınız, Athena S3’teki veriyi okur, işler ve sonucu döndürür. Ödeme modeli de buna göre şekilleniyor: sadece taranan veri miktarı üzerinden ücretlendiriliyorsunuz, TB başına 5 dolar. Bu yüzden veriyi doğru formatlamak ve partition’lamak hem maliyet hem de performans açısından kritik öneme sahip.
Athena’nın arka planda kullandığı katalog servisi AWS Glue Data Catalog‘dur. Tablolar, şemalar ve metadata burada tutulur. Glue Crawler kullanarak S3’teki veriyi otomatik keşfedebilir ya da tabloları manuel olarak tanımlayabilirsiniz.
Temel çalışma akışı şöyle:
- S3’te veri durur (JSON, CSV, Parquet, ORC, Avro formatlarında olabilir)
- Glue Data Catalog’da tablo tanımı yapılır
- Athena bu tablo üzerinde SQL sorgusu çalıştırır
- Sonuçlar belirlenen S3 output bucket’ına yazılır
- Siz sonuçları okursunuz
İlk Kurulum ve Yapılandırma
Athena’yı kullanmaya başlamadan önce birkaç şeyi ayarlamanız gerekiyor. En önemli adım sorgu sonuçlarının yazılacağı S3 bucket’ını belirlemek.
# Athena sonuç bucket'ı oluşturma
aws s3 mb s3://my-athena-query-results-bucket --region eu-west-1
# Bucket'a lifecycle policy ekleyelim, sonuçlar 30 gün sonra silinsin
cat > lifecycle-policy.json << 'EOF'
{
"Rules": [
{
"ID": "DeleteAthenaResults",
"Status": "Enabled",
"Filter": {
"Prefix": ""
},
"Expiration": {
"Days": 30
}
}
]
}
EOF
aws s3api put-bucket-lifecycle-configuration
--bucket my-athena-query-results-bucket
--lifecycle-configuration file://lifecycle-policy.json
Athena workgroup ayarlarını CLI üzerinden da yapabilirsiniz:
# Athena workgroup oluşturma ve output location belirleme
aws athena create-work-group
--name "production-workgroup"
--configuration "ResultConfiguration={OutputLocation=s3://my-athena-query-results-bucket/results/},EnforceWorkGroupConfiguration=true,PublishCloudWatchMetricsEnabled=true,BytesScannedCutoffPerQuery=1073741824"
--description "Production sorgu workgroup - max 1GB tarama limiti"
Burada BytesScannedCutoffPerQuery parametresini 1 GB olarak ayarladım. Bu önemli bir maliyet koruma mekanizması; birisi yanlışlıkla büyük bir tabloyu tam scan eden bir sorgu çalıştırdığında sizi kurtarıyor.
Glue Data Catalog ile Tablo Oluşturma
Şimdi gerçek dünya senaryosuna geçelim. Diyelim ki uygulama loglarınız S3’te JSON formatında tutuyorsunuz ve bu logları sorgulamak istiyorsunuz.
Log dizin yapınız şöyle olsun:
s3://my-app-logs/
app-logs/
year=2024/
month=01/
day=15/
app-log-2024-01-15-00.json.gz
app-log-2024-01-15-01.json.gz
Bu partition yapısı Athena performansı için kritik. Hive-style partition kullandığınızda (year=2024/month=01/day=15/ formatı) Athena sorgunuzda WHERE year='2024' AND month='01' yazdığınızda sadece ilgili klasörleri tarar.
Şimdi bu veri için Athena tablosu oluşturalım:
-- Athena'da DDL sorgusu olarak çalıştırın
CREATE EXTERNAL TABLE IF NOT EXISTS app_logs.application_logs (
timestamp STRING,
level STRING,
service STRING,
message STRING,
user_id STRING,
request_id STRING,
duration_ms INT,
status_code INT,
error_type STRING
)
PARTITIONED BY (
year STRING,
month STRING,
day STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1',
'ignore.malformed.json' = 'TRUE'
)
LOCATION 's3://my-app-logs/app-logs/'
TBLPROPERTIES (
'has_encrypted_data' = 'false',
'projection.enabled' = 'true',
'projection.year.type' = 'integer',
'projection.year.range' = '2023,2025',
'projection.month.type' = 'integer',
'projection.month.range' = '1,12',
'projection.month.digits' = '2',
'projection.day.type' = 'integer',
'projection.day.range' = '1,31',
'projection.day.digits' = '2',
'storage.location.template' = 's3://my-app-logs/app-logs/year=${year}/month=${month}/day=${day}/'
);
Burada Partition Projection kullandım. Bu özellik Glue Catalog’da partition metadata’sı tutmak yerine Athena’nın partition’ları dinamik olarak hesaplamasını sağlıyor. MSCK REPAIR TABLE çalıştırmanıza gerek kalmıyor, yeni günlük loglar otomatik olarak sorgulanabilir hale geliyor.
Pratik Sysadmin Sorguları
Tablonuz hazır, artık gerçekten işe yarar sorgular yazabiliriz.
Son 7 günün hata loglarını çekme:
SELECT
year,
month,
day,
service,
error_type,
COUNT(*) as error_count,
AVG(duration_ms) as avg_duration
FROM app_logs.application_logs
WHERE
year = '2024'
AND month = '01'
AND day BETWEEN '08' AND '15'
AND level = 'ERROR'
GROUP BY year, month, day, service, error_type
ORDER BY error_count DESC
LIMIT 100;
Yavaş endpoint’leri tespit etme (1 saniyenin üzerindeki requestler):
SELECT
service,
message,
COUNT(*) as slow_request_count,
MIN(duration_ms) as min_ms,
MAX(duration_ms) as max_ms,
APPROX_PERCENTILE(duration_ms, 0.95) as p95_ms,
APPROX_PERCENTILE(duration_ms, 0.99) as p99_ms
FROM app_logs.application_logs
WHERE
year = '2024'
AND month = '01'
AND duration_ms > 1000
GROUP BY service, message
ORDER BY slow_request_count DESC;
APPROX_PERCENTILE fonksiyonu performans analizi için çok değerli, özellikle büyük dataset’lerde tam percentile hesabı yapmaktan çok daha hızlı çalışıyor.
AWS CloudTrail Loglarını Athena ile Analiz Etme
Bu başlı başına bir use case. Security auditing ve compliance açısından CloudTrail loglarını Athena ile sorgulamak son derece güçlü bir yaklaşım. AWS’nin sağladığı hazır tablo şemasını kullanabilirsiniz:
CREATE EXTERNAL TABLE cloudtrail_logs.trail_events (
eventVersion STRING,
userIdentity STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
userName: STRING
>,
eventTime STRING,
eventSource STRING,
eventName STRING,
awsRegion STRING,
sourceIPAddress STRING,
userAgent STRING,
errorCode STRING,
errorMessage STRING,
requestParameters STRING,
responseElements STRING,
requestId STRING,
eventId STRING,
resources ARRAY<STRUCT<
arn: STRING,
accountId: STRING,
type: STRING
>>,
eventType STRING,
recipientAccountId STRING
)
PARTITIONED BY (region STRING, year STRING, month STRING, day STRING)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://my-cloudtrail-bucket/AWSLogs/123456789012/CloudTrail/'
TBLPROPERTIES (
'projection.enabled' = 'true',
'projection.region.type' = 'enum',
'projection.region.values' = 'eu-west-1,eu-central-1,us-east-1',
'projection.year.type' = 'integer',
'projection.year.range' = '2023,2025',
'projection.month.type' = 'integer',
'projection.month.range' = '1,12',
'projection.month.digits' = '2',
'projection.day.type' = 'integer',
'projection.day.range' = '1,31',
'projection.day.digits' = '2',
'storage.location.template' = 's3://my-cloudtrail-bucket/AWSLogs/123456789012/CloudTrail/${region}/${year}/${month}/${day}/'
);
Şimdi güvenlik açısından kritik sorgular:
-- Root account kullanımını tespit etme
SELECT
eventTime,
eventName,
sourceIPAddress,
userAgent,
awsRegion
FROM cloudtrail_logs.trail_events
WHERE
year = '2024'
AND month = '01'
AND userIdentity.type = 'Root'
AND eventName NOT IN ('ConsoleLogin')
ORDER BY eventTime DESC;
-- Başarısız API çağrılarını IP bazında gruplama (brute force tespiti)
SELECT
sourceIPAddress,
COUNT(*) as failed_attempts,
ARRAY_JOIN(ARRAY_AGG(DISTINCT eventName), ', ') as attempted_actions,
MIN(eventTime) as first_attempt,
MAX(eventTime) as last_attempt
FROM cloudtrail_logs.trail_events
WHERE
year = '2024'
AND month = '01'
AND day = '15'
AND errorCode IN ('AccessDenied', 'UnauthorizedOperation', 'InvalidClientTokenId')
GROUP BY sourceIPAddress
HAVING COUNT(*) > 10
ORDER BY failed_attempts DESC;
Bu tür sorgular bir security incident sırasında dakikalar içinde çalıştırılabilir ve paha biçilmez bilgi sağlar.
Parquet Formatı ve Maliyet Optimizasyonu
Eğer verileriniz üzerinde kontrolünüz varsa, JSON veya CSV yerine Parquet veya ORC kullanmalısınız. Bu iki format kolonsal (columnar) depolama yapısı kullandığından, Athena yalnızca sorgunuzda kullandığınız kolonları okur. Pratik etkisi çok büyük: bir tabloda 50 kolon olsa bile sadece 5 kolonu sorguluyorsanız, geri kalan 45 kolonun verisi hiç okunmaz.
AWS Glue ETL job’ı ile mevcut JSON loglarınızı Parquet’e dönüştürebilirsiniz:
# Glue ETL script - Python Shell Job olarak çalıştırın
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Kaynak JSON datasını oku
datasource = glueContext.create_dynamic_frame.from_catalog(
database="app_logs",
table_name="application_logs_json",
transformation_ctx="datasource"
)
# Parquet formatında yaz, Snappy compression ile
glueContext.write_dynamic_frame.from_options(
frame=datasource,
connection_type="s3",
connection_options={
"path": "s3://my-app-logs/app-logs-parquet/",
"partitionKeys": ["year", "month", "day"]
},
format="parquet",
format_options={
"compression": "snappy"
},
transformation_ctx="sink"
)
job.commit()
Deneyimlerime göre JSON’dan Parquet’e geçiş genellikle 60-85% daha az veri taranması anlamına geliyor. 1 TB JSON verisi için yılda 60 dolar Athena maliyeti varken, bu verinin Parquet karşılığı 8-15 dolara düşebiliyor.
Athena Sorgularını Boto3 ile Otomatikleştirme
Sysadmin olarak bazı sorguları periyodik çalıştırmak ya da sonuçları başka sistemlere aktarmak isteyebilirsiniz. Boto3 ile bu oldukça temiz yapılabiliyor:
#!/usr/bin/env python3
"""
Athena sorgu çalıştırıcı - günlük hata raporu için kullanılabilir
"""
import boto3
import time
import csv
import io
from datetime import datetime, timedelta
def run_athena_query(query, database, output_location, workgroup='primary'):
athena_client = boto3.client('athena', region_name='eu-west-1')
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': output_location},
WorkGroup=workgroup
)
query_execution_id = response['QueryExecutionId']
print(f"Sorgu başlatıldı: {query_execution_id}")
# Sorgu tamamlanana kadar bekle
while True:
result = athena_client.get_query_execution(
QueryExecutionId=query_execution_id
)
status = result['QueryExecution']['Status']['State']
if status == 'SUCCEEDED':
stats = result['QueryExecution']['Statistics']
scanned_mb = stats['DataScannedInBytes'] / (1024 * 1024)
print(f"Sorgu tamamlandı. Taranan veri: {scanned_mb:.2f} MB")
break
elif status in ['FAILED', 'CANCELLED']:
reason = result['QueryExecution']['Status'].get('StateChangeReason', 'Bilinmiyor')
raise Exception(f"Sorgu başarısız: {reason}")
print(f"Durum: {status}, bekleniyor...")
time.sleep(2)
return query_execution_id
def get_query_results(query_execution_id):
athena_client = boto3.client('athena', region_name='eu-west-1')
results = []
paginator = athena_client.get_paginator('get_query_results')
page_iterator = paginator.paginate(QueryExecutionId=query_execution_id)
headers = None
for page in page_iterator:
rows = page['ResultSet']['Rows']
if headers is None:
headers = [col['VarCharValue'] for col in rows[0]['Data']]
rows = rows[1:] # Header satırını atla
for row in rows:
values = [col.get('VarCharValue', '') for col in row['Data']]
results.append(dict(zip(headers, values)))
return results
# Kullanım örneği
yesterday = datetime.now() - timedelta(days=1)
query = f"""
SELECT
service,
error_type,
COUNT(*) as count
FROM app_logs.application_logs
WHERE
year = '{yesterday.strftime('%Y')}'
AND month = '{yesterday.strftime('%m')}'
AND day = '{yesterday.strftime('%d')}'
AND level = 'ERROR'
GROUP BY service, error_type
ORDER BY count DESC
LIMIT 50
"""
exec_id = run_athena_query(
query=query,
database='app_logs',
output_location='s3://my-athena-query-results-bucket/daily-reports/',
workgroup='production-workgroup'
)
results = get_query_results(exec_id)
for row in results:
print(f"Servis: {row['service']}, Hata: {row['error_type']}, Adet: {row['count']}")
Bu scripti bir Lambda function’a taşıyıp EventBridge ile günlük çalıştırabilirsiniz. Sonuçları SNS üzerinden Slack’e ya da email’e göndermek birkaç satır daha eklemek demek.
ALB Access Logları ile HTTP Analizi
Application Load Balancer logları S3’e yazıldığında Athena için mükemmel bir use case oluşturuyor. AWS’nin sağladığı hazır şemayı kullanabilirsiniz:
CREATE EXTERNAL TABLE alb_logs.access_logs (
type STRING,
time STRING,
elb STRING,
client_ip STRING,
client_port INT,
target_ip STRING,
target_port INT,
request_processing_time DOUBLE,
target_processing_time DOUBLE,
response_processing_time DOUBLE,
elb_status_code STRING,
target_status_code STRING,
received_bytes BIGINT,
sent_bytes BIGINT,
request_verb STRING,
request_url STRING,
request_proto STRING,
user_agent STRING,
ssl_cipher STRING,
ssl_protocol STRING,
target_group_arn STRING,
trace_id STRING,
domain_name STRING,
chosen_cert_arn STRING,
matched_rule_priority STRING,
request_creation_time STRING,
actions_executed STRING,
redirect_url STRING,
lambda_error_reason STRING,
target_port_list STRING,
target_status_code_list STRING,
classification STRING,
classification_reason STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1',
'input.regex' = '([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*)[:-]([0-9]*) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) ([-0-9]*) ([-0-9]*) "([^ ]*) (.*) (- |[^ ]*)" "([^"]*)" ([A-Z0-9-_]+) ([A-Za-z0-9.-]*) ([^ ]*) "([^"]*)" "([^"]*)" "([^"]*)" ([-.0-9]*) ([^ ]*) "([^"]*)" "([^"]*)" "([^ ]*)" "([^s]+?)" "([^s]+)" "([^ ]*)" "([^ ]*)"'
)
LOCATION 's3://my-alb-logs/AWSLogs/123456789012/elasticloadbalancing/eu-west-1/';
Bu tablo ile gerçek dünya analizleri yapabilirsiniz. Örneğin 5xx hataları hangi backend’den geliyor, hangi IP en fazla istek yapıyor, en yavaş endpoint’ler hangileri gibi sorular anında yanıt buluyor.
Performans İpuçları ve En İyi Pratikler
Athena kullanırken dikkat etmeniz gereken ve maliyeti doğrudan etkileyen birkaç kritik nokta var:
- SELECT * kullanmaktan kaçının: Sadece ihtiyacınız olan kolonları seçin. Parquet/ORC formatında bu maliyet ve performansı doğrudan etkiler.
- Partition pruning kullanın: WHERE clause’ınızda mutlaka partition kolonlarını filtreleyin. Yoksa Athena tüm S3 verisini tarar.
- Sorgu sonuçlarını cache’leyin: Athena aynı sorguyu 7 dakika içinde tekrar çalıştırırsanız cache’lenmiş sonucu döndürür, ücret ödemezsiniz.
- LIMIT kullanın geliştirme aşamasında: Sorguyu test ederken LIMIT 10 ekleyerek taranan veriyi sınırlayın.
- Küçük dosyaları birleştirin: S3’te çok sayıda küçük dosya (1 MB altı) Athena performansını ciddi düşürür. Dosyaları 128 MB-1 GB arasında tutmaya çalışın.
- Compression kullanın: Gzip, Snappy, LZ4 gibi compression algoritmaları hem depolama maliyetini hem de Athena tarama maliyetini düşürür.
- Workgroup query limiti: Geliştirici ekibiniz için ayrı bir workgroup oluşturun ve query başına tarama limitini 200-500 MB ile kısıtlayın.
Maliyet Takibi ve Optimizasyon
Athena maliyetlerini CloudWatch ile takip edebilirsiniz ama daha pratik bir yöntem Athena’nın kendi execution logs’unu kullanmak:
# Son 24 saatteki sorguların tarama miktarlarını listele
aws athena list-query-executions
--work-group production-workgroup
--max-results 50
--query 'QueryExecutionIds'
--output text | tr 't' 'n' | while read id; do
aws athena get-query-execution
--query-execution-id "$id"
--query '{
QueryExecutionId: QueryExecution.QueryExecutionId,
State: QueryExecution.Status.State,
DataScannedGB: QueryExecution.Statistics.DataScannedInBytes,
SubmissionTime: QueryExecution.Status.SubmissionDateTime
}'
--output json
done | python3 -c "
import sys, json
total_bytes = 0
for line in sys.stdin:
try:
d = json.loads(line)
bytes_scanned = d.get('DataScannedGB', 0) or 0
total_bytes += bytes_scanned
cost = bytes_scanned / (1024**3) * 5
print(f"{d['QueryExecutionId']}: {bytes_scanned/(1024**2):.1f} MB, ${cost:.4f}")
except:
pass
print(f'Toplam: {total_bytes/(1024**3):.2f} GB, Tahmini maliyet: ${total_bytes/(1024**3)*5:.2f}')
"
Sonuç
AWS Athena, sysadmin toolkit’inin en değerli araçlarından biri haline geldi. Sunucu yönetimi yok, cluster bakımı yok, sadece SQL yazıyorsunuz ve sonuç geliyor. Özellikle CloudTrail analizi, ALB log incelemesi ve uygulama log sorgulaması gibi operasyonel senaryolarda Athena’nın sağladığı hız ve esneklik ölçülemez değerde.
Başarının anahtarı birkaç temel prensipte yatıyor: verilerinizi Parquet formatında tutun, partition stratejinizi doğru belirleyin, Partition Projection kullanarak metadata overhead’ini ortadan kaldırın ve workgroup limitleriyle maliyeti kontrol altında tutun. Bu prensiplere uyduğunuzda hem çok hızlı sorgu sonuçları alırsınız hem de aylık Athena faturanız sizi şaşırtmaz.
Ekibinizde birisi “bu logu analiz etmek için ne kadar beklememiz gerekiyor?” diye sorduğunda, artık cevabınız “birkaç saniye” olacak. Bu farkı bir kez tattınız mı, eski yöntemlere dönmek istemedğinizi göreceksiniz.
