GCP Cloud Tasks ile Görev Kuyruğu Yönetimi
Bir web uygulaması geliştirdiğinizi düşünün. Kullanıcı kayıt olduğunda e-posta göndereceksiniz, fatura oluşturacaksınız, analitik veriler işleyeceksiniz. Tüm bunları HTTP isteğinin içinde yapmaya çalışırsanız kullanıcı 10-15 saniye bekler ve “Bu site neden bu kadar yavaş?” diye düşünür. İşte tam bu noktada görev kuyruğu sistemleri devreye giriyor. GCP Cloud Tasks, bu tür asenkron iş yüklerini yönetmek için Google’ın sunduğu tam yönetilen bir görev kuyruğu servisidir.
Cloud Tasks Nedir ve Neden Kullanmalısınız?
Cloud Tasks, uygulamanızın dışında yürütülmesi gereken işleri yönetmenizi sağlayan bir mesaj kuyruğu servisidir. Pub/Sub’dan farklı olarak Cloud Tasks, HTTP hedef görevlere odaklanır yani bir görevi bir URL’ye HTTP isteği olarak gönderir ve bu isteğin başarılı olup olmadığını takip eder.
Neden Cloud Tasks tercih edersiniz?
- Garantili teslimat: Görev başarısız olursa otomatik olarak yeniden dener
- Hız sınırlama: Hedefe gönderilen istek sayısını kontrol edebilirsiniz
- Zamanlama: Görevi belirli bir zamana kadar geciktirebilirsiniz
- Tekillik: Aynı görevi yanlışlıkla iki kez eklememek için kontrol mekanizmaları
- Görünürlük: Cloud Console üzerinden kuyruk durumunu izleyebilirsiniz
Gerçek dünya senaryosu olarak şunu düşünün: E-ticaret platformunuzda günde 50.000 sipariş geliyor. Her siparişten sonra kargo firmasının API’sine bildirim göndermeniz gerekiyor. Kargo API’si bazen yavaşlıyor veya hata veriyor. Cloud Tasks olmadan bu hataları elle yönetmek, yeniden denemeler yazmak ciddi iş yükü oluşturur. Cloud Tasks ile sadece görevi kuyruğa eklersiniz, gerisi otomatik.
Temel Kavramlar
Cloud Tasks’i anlamak için birkaç temel kavramı oturtmak gerekiyor.
Queue (Kuyruk): Görevlerin bekletildiği yapı. Her kuyruğun kendi hız limitleri ve yeniden deneme politikaları vardır.
Task (Görev): Kuyruğa eklenen bir iş birimi. HTTP isteği olarak tanımlanır.
Worker (İşçi): Görevi alan ve işleyen servis. Cloud Run, App Engine veya herhangi bir HTTP endpoint olabilir.
Dispatch rate: Kuyruğun görevleri worker’a ne hızda gönderdiği.
Retry policy: Görev başarısız olduğunda nasıl yeniden deneneceği.
Kurulum ve Hazırlık
Öncelikle gerekli araçları ve izinleri ayarlayalım.
# gcloud CLI kurulumu sonrası proje ayarı
gcloud config set project YOUR_PROJECT_ID
# Cloud Tasks API'yi etkinleştir
gcloud services enable cloudtasks.googleapis.com
# Gerekli IAM rollerini kontrol et
gcloud projects get-iam-policy YOUR_PROJECT_ID
--flatten="bindings[].members"
--format="table(bindings.role)"
--filter="bindings.members:YOUR_SERVICE_ACCOUNT"
Service account için gerekli roller:
- roles/cloudtasks.enqueuer: Görevi kuyruğa ekleyebilir
- roles/cloudtasks.viewer: Kuyruk ve görev bilgilerini okuyabilir
- roles/cloudtasks.admin: Tam yönetim yetkisi
# Service account'a enqueuer rolü ver
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID
--member="serviceAccount:my-app@YOUR_PROJECT_ID.iam.gserviceaccount.com"
--role="roles/cloudtasks.enqueuer"
Kuyruk Oluşturma
Temel bir kuyruk oluşturmak oldukça basit:
# Basit kuyruk oluşturma
gcloud tasks queues create my-first-queue
--location=europe-west1
# Detaylı kuyruk oluşturma
gcloud tasks queues create order-notification-queue
--location=europe-west1
--max-dispatches-per-second=100
--max-concurrent-dispatches=50
--max-attempts=5
--min-backoff=10s
--max-backoff=300s
--max-doublings=4
Buradaki parametreleri açıklayalım:
- –max-dispatches-per-second: Saniyede kaç görev gönderileceği, burada 100
- –max-concurrent-dispatches: Aynı anda kaç görev işlenebileceği, burada 50
- –max-attempts: Başarısız görev için toplam deneme sayısı
- –min-backoff: İlk yeniden deneme öncesi bekleme süresi
- –max-backoff: Maksimum bekleme süresi
- –max-doublings: Backoff süresinin kaç kez iki katına çıkacağı
Kuyruk bilgilerini görüntülemek için:
# Tüm kuyrukları listele
gcloud tasks queues list --location=europe-west1
# Belirli kuyruğun detaylarını gör
gcloud tasks queues describe order-notification-queue
--location=europe-west1
Göreve Ekleme: HTTP Görevleri
Cloud Tasks’in en çok kullanılan özelliği HTTP görevleridir. Bir worker servisine HTTP isteği gönderirsiniz.
Python ile Görev Ekleme
Gerçek bir senaryo: Kullanıcı sipariş verdiğinde fatura gönderme görevini kuyruğa ekleyelim.
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
import json
import datetime
def create_order_notification_task(order_id: str, customer_email: str, delay_seconds: int = 0):
"""
Sipariş bildirimi için Cloud Tasks görevi oluşturur.
Args:
order_id: Sipariş ID
customer_email: Müşteri e-posta adresi
delay_seconds: Görevin kaç saniye sonra işleneceği
"""
client = tasks_v2.CloudTasksClient()
project = "my-ecommerce-project"
location = "europe-west1"
queue = "order-notification-queue"
url = "https://my-worker-service-xyz.run.app/process-order-notification"
parent = client.queue_path(project, location, queue)
# Görev gövdesi
payload = {
"order_id": order_id,
"customer_email": customer_email,
"action": "send_invoice",
"timestamp": datetime.datetime.utcnow().isoformat()
}
task = {
"http_request": {
"http_method": tasks_v2.HttpMethod.POST,
"url": url,
"headers": {
"Content-Type": "application/json",
"X-Custom-Header": "cloud-tasks"
},
"body": json.dumps(payload).encode(),
"oidc_token": {
"service_account_email": f"my-app@{project}.iam.gserviceaccount.com"
}
}
}
# Gecikme varsa zamanı ayarla
if delay_seconds > 0:
scheduled_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay_seconds)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(scheduled_time)
task["schedule_time"] = timestamp
response = client.create_task(request={"parent": parent, "task": task})
print(f"Görev oluşturuldu: {response.name}")
return response
# Kullanım örneği
create_order_notification_task(
order_id="ORD-2024-001234",
customer_email="[email protected]",
delay_seconds=0 # Hemen işle
)
# 5 dakika sonra hatırlatma gönder
create_order_notification_task(
order_id="ORD-2024-001234",
customer_email="[email protected]",
delay_seconds=300
)
Worker Servisi Oluşturma
Cloud Run üzerinde çalışacak basit bir worker servisi:
from flask import Flask, request, jsonify
import json
import logging
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
@app.route('/process-order-notification', methods=['POST'])
def process_order_notification():
"""
Cloud Tasks'ten gelen sipariş bildirim görevlerini işler.
"""
# Cloud Tasks bazı özel başlıklar gönderir
task_name = request.headers.get('X-CloudTasks-TaskName')
queue_name = request.headers.get('X-CloudTasks-QueueName')
retry_count = request.headers.get('X-CloudTasks-TaskRetryCount', '0')
logging.info(f"Görev alındı: {task_name}, Kuyruk: {queue_name}, Deneme: {retry_count}")
try:
payload = request.get_json()
order_id = payload['order_id']
customer_email = payload['customer_email']
action = payload['action']
if action == 'send_invoice':
# Fatura gönderme işlemi
send_invoice_email(order_id, customer_email)
logging.info(f"Fatura gönderildi: {order_id} -> {customer_email}")
# 2xx döndürmek görevi başarılı sayar
return jsonify({"status": "success", "order_id": order_id}), 200
except Exception as e:
logging.error(f"Görev işlenirken hata: {str(e)}")
# 5xx döndürmek görevi başarısız sayar ve yeniden denenir
return jsonify({"error": str(e)}), 500
def send_invoice_email(order_id: str, email: str):
# Gerçek e-posta gönderme mantığı buraya
logging.info(f"E-posta gönderiliyor: {order_id} için {email}")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Terraform ile Altyapı Yönetimi
Üretim ortamında Cloud Tasks kuyruklarını Terraform ile yönetmek en sağlıklı yaklaşım. Böylece altyapı kodunuzla birlikte versiyonlanır.
# cloud_tasks.tf
resource "google_cloud_tasks_queue" "order_notification" {
name = "order-notification-queue"
location = "europe-west1"
project = var.project_id
rate_limits {
max_dispatches_per_second = 100
max_concurrent_dispatches = 50
}
retry_config {
max_attempts = 5
min_backoff = "10s"
max_backoff = "300s"
max_doublings = 4
}
stackdriver_logging_config {
sampling_ratio = 1.0 # Tüm görevleri logla
}
}
resource "google_cloud_tasks_queue" "email_queue" {
name = "email-queue"
location = "europe-west1"
project = var.project_id
rate_limits {
# E-posta servisi limitlerini aşmamak için düşük tutuyoruz
max_dispatches_per_second = 10
max_concurrent_dispatches = 5
}
retry_config {
max_attempts = 10
min_backoff = "30s"
max_backoff = "600s"
max_doublings = 5
}
}
# Uygulama servis hesabına gerekli izinler
resource "google_project_iam_member" "task_enqueuer" {
project = var.project_id
role = "roles/cloudtasks.enqueuer"
member = "serviceAccount:${google_service_account.app.email}"
}
İzleme ve Gözlemlenebilirlik
Üretim sisteminde kuyruğun sağlıklı çalıştığını doğrulamak kritik önem taşır.
# Kuyruk istatistiklerini görüntüle
gcloud tasks queues describe order-notification-queue
--location=europe-west1
--format="yaml"
# Kuyruktaki görevleri listele (ilk 100)
gcloud tasks list
--queue=order-notification-queue
--location=europe-west1
# Belirli bir görevi detaylı gör
gcloud tasks describe TASK_ID
--queue=order-notification-queue
--location=europe-west1
Cloud Monitoring ile alarm kurmak için:
# Yüksek görev sayısı için alarm oluştur
gcloud alpha monitoring policies create
--notification-channels=CHANNEL_ID
--display-name="Cloud Tasks Yüksek Görev Sayısı"
--condition-display-name="Kuyruk doldu"
--condition-filter='resource.type="cloudtasks.googleapis.com/Queue" AND metric.type="cloudtasks.googleapis.com/queue/depth"'
--condition-threshold-value=10000
--condition-threshold-comparison=COMPARISON_GT
--condition-duration=300s
Önemli izleme metrikleri:
- cloudtasks.googleapis.com/queue/depth: Kuyruktaki bekleyen görev sayısı
- cloudtasks.googleapis.com/queue/task_attempt_count: Görev deneme sayısı
- cloudtasks.googleapis.com/queue/task_attempt_failures: Başarısız deneme sayısı
Gerçek Dünya Senaryosu: Toplu E-posta Kampanyası
10.000 kullanıcıya kampanya e-postası göndereceğiz. Hepsini aynı anda göndermek ne e-posta servisimizi hem de karşı sunucuyu ezer. Cloud Tasks ile kontrollü şekilde göndelim:
import time
from google.cloud import tasks_v2
import json
def schedule_bulk_email_campaign(campaign_id: str, user_list: list):
"""
10.000 kullanıcıya kademeli e-posta kampanyası gönderir.
Saatte 1000 e-posta olmak üzere 10 saatte tamamlar.
"""
client = tasks_v2.CloudTasksClient()
project = "my-project"
location = "europe-west1"
queue = "email-queue"
parent = client.queue_path(project, location, queue)
# Her 1000 kullanıcı için 1 saat gecikme ekle
batch_size = 1000
delay_hours = 0
tasks_created = 0
for i in range(0, len(user_list), batch_size):
batch = user_list[i:i + batch_size]
for j, user in enumerate(batch):
# Her göreve kademeli gecikme ekle (saniye cinsinden)
# 1000 kullanıcıyı 1 saate yay: her biri 3.6 saniye arayla
delay_seconds = (delay_hours * 3600) + (j * 3.6)
payload = {
"campaign_id": campaign_id,
"user_id": user["id"],
"email": user["email"],
"name": user["name"]
}
from google.protobuf import timestamp_pb2
import datetime
scheduled_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay_seconds)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(scheduled_time)
task = {
"schedule_time": timestamp,
"http_request": {
"http_method": tasks_v2.HttpMethod.POST,
"url": "https://worker.example.com/send-campaign-email",
"headers": {"Content-Type": "application/json"},
"body": json.dumps(payload).encode(),
"oidc_token": {
"service_account_email": f"app@{project}.iam.gserviceaccount.com"
}
}
}
client.create_task(request={"parent": parent, "task": task})
tasks_created += 1
delay_hours += 1
print(f"Batch {i//batch_size + 1} planlandı. Toplam görev: {tasks_created}")
# API rate limit aşmamak için kısa bekleme
time.sleep(0.1)
print(f"Kampanya planlandı! Toplam {tasks_created} görev oluşturuldu.")
return tasks_created
Güvenlik Konuları
Cloud Tasks kullanırken güvenliği doğru yapılandırmak kritik. Worker endpoint’iniz herkese açık olmamalı, sadece Cloud Tasks’ten gelen istekleri kabul etmeli.
OIDC Token Doğrulama kullanarak bunu sağlayabilirsiniz. Görev oluştururken oidc_token alanını doldurursunuz, Cloud Tasks bu service account adına bir JWT token üretir ve isteğe ekler. Worker bu token’ı doğrular.
Cloud Run kullanıyorsanız --no-allow-unauthenticated flag’i ile yalnızca kimliği doğrulanmış isteklere izin verin:
# Cloud Run servisini sadece kimliği doğrulanmış istekler için deploy et
gcloud run deploy order-worker
--image=gcr.io/my-project/order-worker:latest
--platform=managed
--region=europe-west1
--no-allow-unauthenticated
[email protected]
# Cloud Tasks service account'una Cloud Run Invoker rolü ver
gcloud run services add-iam-policy-binding order-worker
--region=europe-west1
--member="serviceAccount:[email protected]"
--role="roles/run.invoker"
Sorun Giderme İpuçları
Üretimde karşılaşılan yaygın sorunlar ve çözümleri:
Görevler sürekli yeniden deneniyor: Worker’ınız 2xx dışı bir kod dönüyor demektir. Worker loglarına bakın. En yaygın sebep, worker’ın işlemi tamamlamadan zaman aşımına uğramasıdır.
# Worker loglarını kontrol et
gcloud logging read
'resource.type="cloud_run_revision" AND severity>=ERROR'
--limit=50
--format="table(timestamp,textPayload)"
Görevler çok yavaş işleniyor: Kuyruğun max-dispatches-per-second değeri düşük olabilir. Artırın:
gcloud tasks queues update order-notification-queue
--location=europe-west1
--max-dispatches-per-second=200
--max-concurrent-dispatches=100
Görev yanlışlıkla iki kez işleniyor: Worker’ınızın idempotent olduğundan emin olun. Network sorunları nedeniyle Cloud Tasks bazen görevi yeniden gönderebilir. Veritabanında işlenmiş görev ID’lerini saklayarak tekrarlı işlemleri önleyin.
OIDC token doğrulama hatası: Service account’un Cloud Run Invoker rolüne sahip olduğunu doğrulayın ve OIDC audience değerinin worker URL’siyle tam olarak eşleştiğini kontrol edin.
# Service account izinlerini kontrol et
gcloud run services get-iam-policy order-worker
--region=europe-west1
Maliyet Optimizasyonu
Cloud Tasks fiyatlandırması görev sayısına göre belirlenir. İlk 1 milyon görev aylık ücretsizdir. Bunun ötesinde 1 milyon görev başına yaklaşık 0,40 USD ödersiniz.
Maliyeti optimize etmek için:
- Gereksiz yeniden denemelerden kaçının; worker’ınızı sağlam tutun
- Birden fazla küçük görevi mantıksal olarak birleştirip tek görev yapabilirsiniz
- Test ortamınızda Cloud Tasks kullanıyorsanız kuyruğu duraklatın
# Geliştirme ortamında kuyruğu duraklat
gcloud tasks queues pause order-notification-queue
--location=europe-west1
# Tekrar başlat
gcloud tasks queues resume order-notification-queue
--location=europe-west1
Sonuç
GCP Cloud Tasks, asenkron iş yüklerini yönetmek için olgun ve güvenilir bir platform. E-posta göndermekten fatura oluşturmaya, veri işlemeden üçüncü parti API çağrılarına kadar pek çok senaryoda kullanabilirsiniz.
Temel çıkarımlar:
- Kullanıcıyı bekletecek her işlemi kuyruğa taşıyın
- Worker’larınızı her zaman idempotent tasarlayın, aynı görevin iki kez işlenmesi sorun çıkarmamalı
- Yeniden deneme politikalarını işinizin doğasına göre ayarlayın
- Güvenliği es geçmeyin, OIDC ile worker’larınızı koruyun
- İzlemeye yatırım yapın, kuyruk derinliği ve hata oranları için alarmlar kurun
- Terraform ile kuyruklarınızı kod olarak yönetin
Cloud Tasks ile başlangıç yapmak gerçekten kolay. İlk kuyruğunuzu birkaç dakikada oluşturabilir, Python veya Go istemcisiyle görev eklemeye başlayabilirsiniz. Zamanla uygulamanızın hangi iş yüklerinin asenkron yapılabileceğini görür ve sistemi buna göre genişletirsiniz. Sonuç olarak hem daha hızlı bir kullanıcı deneyimi hem de daha dayanıklı bir mimari elde edersiniz.
