RabbitMQ ile Laravel ve PHP Entegrasyonu

Üretim ortamında bir e-ticaret sitesinin sipariş işleme sürecini yönetirken şunu fark ettim: senkron işlemler bir noktadan sonra gerçek bir darboğaz haline geliyor. Kullanıcı siparişi tamamlıyor, sistem stok güncelleme, fatura oluşturma, kargo entegrasyonu ve e-posta gönderimi için beklemeye alınıyor. Sonuç? Yavaş response süreleri, zaman zaman timeout hataları ve mutsuz kullanıcılar. RabbitMQ bu problemi köklü olarak çözdü. Bu yazıda Laravel ve vanilla PHP projeleriyle RabbitMQ entegrasyonunu, gerçek senaryolardan örneklerle aktaracağım.

RabbitMQ Nedir ve Neden Kullanmalısınız

RabbitMQ, AMQP (Advanced Message Queuing Protocol) protokolünü kullanan açık kaynaklı bir mesaj aracısıdır. Erlang ile yazılmıştır ve yüksek erişilebilirlik konusunda güçlü bir geçmişe sahiptir. Publisher/Consumer mimarisinde çalışır: bir taraf mesajı kuyruğa bırakır, diğer taraf bu mesajı alıp işler.

Şunu somut bir örnekle düşünelim. Bir kullanıcı sipariş verdiğinde:

  • Sipariş veritabanına kaydedilir (senkron, hemen olmalı)
  • Fatura PDF oluşturulur (asenkron olabilir)
  • Kargo firması API’sine bildirim gönderilir (asenkron olabilir)
  • Müşteriye onay maili gönderilir (asenkron olabilir)
  • Muhasebe sistemine kayıt düşülür (asenkron olabilir)

Bu dört işlemi asenkrona taşıdığınızda kullanıcı yanıt süresi dramatik biçimde düşüyor. RabbitMQ burada araya giriyor ve bu işleri güvenilir bir şekilde kuyruğa alıp, consumer’larınızın işlemesini sağlıyor.

Kurulum

Ubuntu/Debian Üzerinde RabbitMQ Kurulumu

# Erlang bağımlılığını kur
sudo apt-get install -y erlang

# RabbitMQ repository ekle
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash

# RabbitMQ kur
sudo apt-get install -y rabbitmq-server

# Servisi başlat ve otomatik başlatmayı etkinleştir
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server

# Management plugin'i aktif et
sudo rabbitmq-plugins enable rabbitmq_management

# Admin kullanıcısı oluştur
sudo rabbitmqctl add_user admin guclu_sifre_buraya
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# Varsayılan guest kullanıcısını sil (güvenlik)
sudo rabbitmqctl delete_user guest

Management UI’a http://sunucu-ip:15672 adresinden ulaşabilirsiniz. Bu arayüz production’da gerçekten hayat kurtarıcı; kuyruk derinliklerini, mesaj oranlarını ve consumer durumlarını anlık izleyebiliyorsunuz.

Docker ile Hızlı Kurulum

Geliştirme ortamı için Docker çok daha pratik:

docker run -d 
  --name rabbitmq 
  -p 5672:5672 
  -p 15672:15672 
  -e RABBITMQ_DEFAULT_USER=admin 
  -e RABBITMQ_DEFAULT_PASS=guclu_sifre 
  -v rabbitmq_data:/var/lib/rabbitmq 
  rabbitmq:3-management

PHP AMQP Kütüphanesi Kurulumu

PHP tarafında php-amqplib kütüphanesini kullanacağız. Bu kütüphane sektörde en yaygın kullanılan ve aktif olarak geliştirilen seçenek.

composer require php-amqplib/php-amqplib

Laravel için ise vladimir-yuldashev/laravel-queue-rabbitmq paketi tercih edilen yöntem. Bu paket Laravel’in built-in queue sistemine RabbitMQ driver’ı ekliyor, yani mevcut dispatch() ve Queue::push() kullanımlarınızı değiştirmenize gerek kalmıyor.

composer require vladimir-yuldashev/laravel-queue-rabbitmq

Laravel Entegrasyonu

Konfigürasyon

config/queue.php dosyasına RabbitMQ connection bilgilerini ekleyin:

'connections' => [
    // ... mevcut connectionlar

    'rabbitmq' => [
        'driver' => 'rabbitmq',
        'queue' => env('RABBITMQ_QUEUE', 'default'),
        'connection' => PhpAmqpLibConnectionAMQPLazyConnection::class,

        'hosts' => [
            [
                'host' => env('RABBITMQ_HOST', '127.0.0.1'),
                'port' => env('RABBITMQ_PORT', 5672),
                'user' => env('RABBITMQ_USER', 'admin'),
                'password' => env('RABBITMQ_PASSWORD', ''),
                'vhost' => env('RABBITMQ_VHOST', '/'),
            ],
        ],

        'options' => [
            'ssl_options' => [
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],
            'queue' => [
                'job' => VladimirYuldashevLaravelQueueRabbitMQQueueJobsRabbitMQJob::class,
            ],
        ],
    ],
],

.env dosyanıza da ekleyin:

QUEUE_CONNECTION=rabbitmq
RABBITMQ_HOST=127.0.0.1
RABBITMQ_PORT=5672
RABBITMQ_USER=admin
RABBITMQ_PASSWORD=guclu_sifre
RABBITMQ_VHOST=/
RABBITMQ_QUEUE=default

Job Oluşturma ve Dispatch

Sipariş onay maili için bir job oluşturalım:

php artisan make:job SendOrderConfirmationEmail
<?php

namespace AppJobs;

use AppModelsOrder;
use AppMailOrderConfirmation;
use IlluminateBusQueueable;
use IlluminateContractsQueueShouldQueue;
use IlluminateFoundationBusDispatchable;
use IlluminateQueueInteractsWithQueue;
use IlluminateQueueSerializesModels;
use IlluminateSupportFacadesMail;
use IlluminateSupportFacadesLog;

class SendOrderConfirmationEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries = 3;
    public int $backoff = 60; // saniye cinsinden retry bekleme süresi

    public function __construct(
        private readonly Order $order
    ) {}

    public function handle(): void
    {
        Mail::to($this->order->customer_email)
            ->send(new OrderConfirmation($this->order));

        Log::info('Sipariş onay maili gönderildi', [
            'order_id' => $this->order->id,
            'email' => $this->order->customer_email
        ]);
    }

    public function failed(Throwable $exception): void
    {
        Log::error('Sipariş onay maili gönderilemedi', [
            'order_id' => $this->order->id,
            'error' => $exception->getMessage()
        ]);

        // Burada alarm sistemine bildirim gönderilebilir
    }
}

Controller’dan dispatch etmek ise son derece temiz:

<?php

namespace AppHttpControllers;

use AppJobsSendOrderConfirmationEmail;
use AppJobsGenerateInvoicePdf;
use AppJobsNotifyShippingProvider;
use AppModelsOrder;

class OrderController extends Controller
{
    public function store(OrderRequest $request)
    {
        $order = Order::create($request->validated());

        // Bu işlemler kuyruğa alınır, kullanıcı beklemez
        SendOrderConfirmationEmail::dispatch($order)
            ->onQueue('emails');

        GenerateInvoicePdf::dispatch($order)
            ->onQueue('invoices')
            ->delay(now()->addSeconds(5));

        NotifyShippingProvider::dispatch($order)
            ->onQueue('integrations');

        return response()->json([
            'message' => 'Siparişiniz alındı',
            'order_id' => $order->id
        ], 201);
    }
}

Worker’ı başlatmak için:

# Tek queue için
php artisan queue:work rabbitmq --queue=emails

# Öncelikli sırayla birden fazla queue
php artisan queue:work rabbitmq --queue=integrations,emails,invoices

# Supervisor ile production'da yönetmek için
php artisan queue:work rabbitmq --sleep=3 --tries=3 --max-time=3600

Vanilla PHP ile RabbitMQ Kullanımı

Laravel kullanmayan projelerde, hatta servis mesh mimarilerinde bare-metal PHP ile RabbitMQ kullanmak gerekebilir. Bunun için php-amqplib kütüphanesini doğrudan kullanırız.

Publisher (Mesaj Gönderici)

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibExchangeAMQPExchangeType;

class OrderEventPublisher
{
    private $connection;
    private $channel;

    public function __construct(
        private string $host,
        private int $port,
        private string $user,
        private string $password
    ) {
        $this->connection = new AMQPStreamConnection(
            $this->host,
            $this->port,
            $this->user,
            $this->password
        );
        $this->channel = $this->connection->channel();

        // Exchange tanımla (topic tipi, routing key'e göre yönlendirme yapılabilir)
        $this->channel->exchange_declare(
            'order_events',
            AMQPExchangeType::TOPIC,
            false,  // passive
            true,   // durable (broker restart sonrası kalıcı)
            false   // auto-delete
        );
    }

    public function publish(string $routingKey, array $data): void
    {
        $messageBody = json_encode([
            'data' => $data,
            'timestamp' => time(),
            'version' => '1.0'
        ]);

        $message = new AMQPMessage(
            $messageBody,
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // disk'e yaz
            ]
        );

        $this->channel->basic_publish(
            $message,
            'order_events',
            $routingKey
        );

        echo "Mesaj gönderildi: {$routingKey}n";
    }

    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// Kullanım
$publisher = new OrderEventPublisher('localhost', 5672, 'admin', 'sifre');
$publisher->publish('order.created', [
    'order_id' => 12345,
    'customer_id' => 678,
    'total' => 249.90
]);
$publisher->publish('order.payment.confirmed', [
    'order_id' => 12345,
    'payment_method' => 'credit_card'
]);

Consumer (Mesaj Tüketici)

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

class OrderEventConsumer
{
    private $connection;
    private $channel;

    public function __construct(
        private string $host,
        private int $port,
        private string $user,
        private string $password
    ) {
        $this->connection = new AMQPStreamConnection(
            $this->host, $this->port,
            $this->user, $this->password,
            '/',
            false, 'AMQPLAIN', null, 'en_US',
            3.0,  // connection timeout
            3.0,  // read/write timeout
            null, false,
            60    // heartbeat
        );

        $this->channel = $this->connection->channel();

        // Queue oluştur ve exchange'e bağla
        $this->channel->queue_declare(
            'email_notifications',
            false,  // passive
            true,   // durable
            false,  // exclusive
            false   // auto-delete
        );

        // Topic pattern ile binding: order.* pattern'ı tüm order eventlerini yakalar
        $this->channel->queue_bind(
            'email_notifications',
            'order_events',
            'order.*'
        );

        // QoS: bir seferde kaç mesaj alacağımızı belirle
        // Bu ayar consumer'ların yüke göre dengelenmesini sağlar
        $this->channel->basic_qos(null, 1, null);
    }

    public function consume(): void
    {
        echo "Consumer başladı, mesaj bekleniyor...n";

        $this->channel->basic_consume(
            'email_notifications',
            '',     // consumer tag
            false,  // no-local
            false,  // no-ack (false = manual ack)
            false,  // exclusive
            false,  // no-wait
            function (AMQPMessage $message) {
                $this->processMessage($message);
            }
        );

        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }

    private function processMessage(AMQPMessage $message): void
    {
        try {
            $data = json_decode($message->getBody(), true);
            $routingKey = $message->getRoutingKey();

            echo "İşleniyor: {$routingKey}n";

            // İş mantığı burada
            match ($routingKey) {
                'order.created' => $this->sendWelcomeEmail($data['data']),
                'order.payment.confirmed' => $this->sendPaymentConfirmation($data['data']),
                default => $this->logUnhandledEvent($routingKey, $data)
            };

            // Başarılı işlemi onayla
            $message->ack();

        } catch (Exception $e) {
            error_log("Mesaj işleme hatası: " . $e->getMessage());

            // Yeniden kuyruğa al (requeue: true)
            // Kalıcı hata durumunda false yaparak dead-letter queue'ya düşürün
            $message->nack(true);
        }
    }

    private function sendWelcomeEmail(array $orderData): void
    {
        // Mail gönderme mantığı
        echo "Sipariş oluşturma maili gönderildi: Order #{$orderData['order_id']}n";
    }

    private function sendPaymentConfirmation(array $orderData): void
    {
        echo "Ödeme onay maili gönderildi: Order #{$orderData['order_id']}n";
    }

    private function logUnhandledEvent(string $key, array $data): void
    {
        error_log("Bilinmeyen event: {$key}");
    }
}

$consumer = new OrderEventConsumer('localhost', 5672, 'admin', 'sifre');
$consumer->consume();

Dead Letter Queue Konfigürasyonu

Birçok projede atladığım ve sonradan acı çektiğim konu: dead-letter queue’lar. Bir mesaj işlenemediğinde nereye gidecek? Cevaplamazsanız mesajlar kaybolur.

// Queue'yu dead-letter exchange ile birlikte oluşturma
$this->channel->exchange_declare('dead_letter_exchange', 'direct', false, true, false);
$this->channel->queue_declare('failed_jobs', false, true, false, false);
$this->channel->queue_bind('failed_jobs', 'dead_letter_exchange', 'failed');

// Ana queue'yu DLX ile tanımla
$this->channel->queue_declare(
    'email_notifications',
    false,
    true,
    false,
    false,
    false,
    new PhpAmqpLibWireAMQPTable([
        'x-dead-letter-exchange' => 'dead_letter_exchange',
        'x-dead-letter-routing-key' => 'failed',
        'x-message-ttl' => 86400000, // 24 saat (ms cinsinden)
    ])
);

Bu konfigürasyon ile işlenemeyen mesajlar failed_jobs kuyruğuna düşüyor. Oradan manuel inceleme veya otomatik tekrar işleme yapabilirsiniz.

Supervisor ile Production’da Worker Yönetimi

Worker süreçlerinin production’da güvenilir çalışması için Supervisor şart:

sudo apt-get install -y supervisor
sudo nano /etc/supervisor/conf.d/laravel-worker.conf
[program:laravel-email-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/html/artisan queue:work rabbitmq --queue=emails --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=3
redirect_stderr=true
stdout_logfile=/var/log/supervisor/email-worker.log
stopwaitsecs=3600

[program:laravel-integration-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/html/artisan queue:work rabbitmq --queue=integrations --sleep=3 --tries=5 --max-time=3600
autostart=true
autorestart=true
user=www-data
numprocs=2
redirect_stderr=true
stdout_logfile=/var/log/supervisor/integration-worker.log
stopwaitsecs=3600
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start all
sudo supervisorctl status

İzleme ve Alerting

Management API’yi kullanarak basit bir sağlık kontrol scripti:

<?php

function checkRabbitMQHealth(string $host, string $user, string $password): array
{
    $url = "http://{$host}:15672/api/queues";
    
    $context = stream_context_create([
        'http' => [
            'header' => 'Authorization: Basic ' . base64_encode("{$user}:{$password}")
        ]
    ]);

    $response = file_get_contents($url, false, $context);
    $queues = json_decode($response, true);

    $alerts = [];
    foreach ($queues as $queue) {
        $messageCount = $queue['messages'] ?? 0;
        $consumerCount = $queue['consumers'] ?? 0;

        if ($messageCount > 1000) {
            $alerts[] = "UYARI: {$queue['name']} kuyruğunda {$messageCount} mesaj birikmiş!";
        }

        if ($consumerCount === 0 && $messageCount > 0) {
            $alerts[] = "KRİTİK: {$queue['name']} kuyruğunda consumer yok ama {$messageCount} mesaj var!";
        }
    }

    return $alerts;
}

$alerts = checkRabbitMQHealth('localhost', 'admin', 'sifre');
foreach ($alerts as $alert) {
    echo $alert . "n";
    // Slack, PagerDuty vs. bildirim gönderin
}

Sonuç

RabbitMQ ve PHP/Laravel entegrasyonu ilk bakışta karmaşık görünse de doğru kurulduğunda sisteminize ciddi bir esneklik katıyor. Birkaç noktanın altını çizmek istiyorum:

  • Dead-letter queue’ları mutlaka tanımlayın: Aksi halde sorunlu mesajlar kaybolur, sessiz veri kaybı yaşarsınız.
  • Manuel acknowledgement kullanın: no-ack: true kolaycılığına kaçmayın, işlenemeyen mesajları kaybedersiniz.
  • QoS ayarlarını ihmal etmeyin: basic_qos(null, 1, null) consumer’lar arasında adil yük dağılımını sağlar.
  • Heartbeat’i konfigüre edin: Uzun süren işlemlerde connection kopabilir, heartbeat bunu önler.
  • Worker’larınızı Supervisor ile yönetin: Elle başlatılan worker’lar kaçınılmaz olarak bir gün ölür ve fark edilmez.

Management UI’ı düzenli takip edin. Kuyruk derinlikleri artmaya başladığında ya consumer sayısını artırmanız, ya iş mantığını optimize etmeniz ya da işleme kapasitesini gözden geçirmeniz gerekiyor. Bu metrikleri erken yakalamak, gece yarısı telefon almaktan çok daha iyi.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir