websocket ve rabbitmq ile realtime broadcast

bir süre önce elektrikli araç şarj cihazlarının yönetildiği bir proje üzerinde çalışıyordum. o projede kullanıcılara aktif şarj bilgilerini anlık olarak nasıl göstereceğimiz konusu gündeme gelecekti.

ben de daha önce çalıştığım projelerde websocket + rabbitmq ile çoklu pod üzerinde çalışan bir realtime broadcast yapısı kurmamıştım. bu yüzden önce küçük bir poc yapmak istedim. zamanı geldiğinde “bunu böyle çözebiliriz” diyebileceğim çalışan bir örnek elimde olursa güzel olurdu.

bu problemi birkaç farklı yolla çözebilirdik. mobil uygulama belli aralıklarla http isteği atabilirdi, signalr kullanılabilirdi veya native websocket bağlantısı kurulabilirdi.

ben bu örnekte native websocket kullanmayı tercih ettim. signalr da gayet mantıklı bir alternatifti, özellikle backend tarafında group yönetimi gibi işleri kolaylaştırıyordu. fakat bu senaryoda daha hafif bir yapı kurmak istedim. native websocket ile bağlantı yönetimi, room/channel mantığı ve mesaj publish etme tarafında kontrol biraz daha bende kalacaktı.

ilk versiyon: memory üzerinde room yönetimi

ilk denemede çok basit bir websocket yapısı kurdum. client bağlanıyor, anlık mesajları alıyor, kullanıcı deneyimi gayet iyi görünüyordu.

fakat aynı charge point id’ye, yani aynı channel’a, birden fazla client bağlanınca problem ortaya çıktı. yeni bağlantı geldiğinde eski bağlantı düşüyordu.

aslında ihtiyaç biraz chatroom mantığına benziyordu. aynı odaya birden fazla client bağlanabilmeli ve o odaya gelen mesaj herkes tarafından alınabilmeliydi.

bunun için memory üzerinde room bazlı bir yapı kurdum. oda adını ve o odaya bağlı websocket bağlantılarını tutmak için iç içe `ConcurrentDictionary` kullandım.

private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, WebSocket>> _rooms = new();

her websocket bağlantısında ilgili odaya yeni bir socket ekleniyordu.

public void AddSocket(string room, string id, WebSocket socket)
{
    var roomSockets = _rooms.GetOrAdd(room, _ => new ConcurrentDictionary<string, WebSocket>());
    roomSockets.TryAdd(id, socket);
}

bağlantı kapandığında socket’i odadan silmek gerekiyordu. oda tamamen boşaldıysa dış dictionary’den de temizlenmeliydi.

public async Task RemoveSocket(string room, string id)
{
    if (!_rooms.TryGetValue(room, out var roomSockets) || !roomSockets.TryRemove(id, out var socket))
        return;

    if (socket.State == WebSocketState.Open)
        await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closed by manager", CancellationToken.None);

    if (roomSockets.IsEmpty)
        _rooms.TryRemove(room, out _);
}

broadcast tarafında ise ilgili odadaki tüm socket’lere mesaj gönderiliyor, kapanmış veya hata veren socket’ler temizleniyordu.

public async Task BroadcastMessage(string room, string message)
{
    if (!_rooms.TryGetValue(room, out var roomSockets))
        return;

    var buffer = Encoding.UTF8.GetBytes(message);

    var tasks = roomSockets.Select(async pair =>
    {
        var socket = pair.Value;

        if (socket.State != WebSocketState.Open)
        {
            await RemoveSocket(room, pair.Key);
            return;
        }

        try
        {
            await socket.SendAsync(
                new ArraySegment<byte>(buffer),
                WebSocketMessageType.Text,
                true,
                CancellationToken.None);
        }
        catch (WebSocketException)
        {
            await RemoveSocket(room, pair.Key);
        }
    });

    await Task.WhenAll(tasks);
}

bu yapı tek pod üzerinde çalışıyordu. bağlantılar memory’de tutuluyordu ve manager class singleton olarak çalışıyordu. fakat production ortamında uygulamanın tek pod olarak çalışmayacağını biliyordum.

uygulamayı birden fazla pod ile ayağa kaldırınca beklenen problem ortaya çıktı: bir pod’a gelen mesaj, diğer pod’daki websocket bağlantılarına ulaşmıyordu.

yani memory üzerinde tuttuğum room yapısı pod içinde çalışıyordu ama pod’lar arasında hiçbir anlam ifade etmiyordu.

çoklu pod problemi

buradaki ihtiyaç şuydu: bir room’a mesaj geldiğinde, o room’a hangi pod üzerinde bağlı olursa olsun tüm websocket client’lara mesaj iletilmeliydi.

bunun için pod’lar arasında mesaj dağıtacak bir pub/sub yapısına ihtiyacım vardı.

elimde iki mantıklı seçenek vardı: redis pub/sub veya rabbitmq.

redis’i ilk başta düşündüm. fakat projede redis’i daha çok distributed cache ve hızlı veri erişimi için kullanma kararı almıştık. bu yüzden mesaj dağıtımı tarafında rabbitmq ile ilerlemek daha doğru geldi.

rabbitmq tarafı: tek exchange, room bazlı routing key

ilk akla gelen çözüm her room için ayrı bir fanout exchange açmaktı. fakat bu iyi ölçeklenen bir yapı değildi.

room sayısı arttıkça broker üzerinde exchange sayısı da artacaktı. ayrıca her publish öncesi exchange declare etmek hot path üzerinde gereksiz bir round-trip anlamına geliyordu.

bunun yerine tek bir direct exchange kullandım. exchange adı sabitti, room adı ise routing key olarak kullanılıyordu.

mantık şu şekildeydi:

her pod, ilgilendiği room için kendi exclusive ve auto-delete queue’sunu açıyor. sonra bu queue’yu direct exchange’e `routingKey = room` olacak şekilde bind ediyor.

böylece bir room’a mesaj publish edildiğinde, o room’a subscribe olmuş tüm pod’ların queue’larına mesaj düşüyor. pod kapanırsa veya subscription dispose edilirse queue da broker tarafında otomatik temizleniyor.

exchange’i her seferinde declare etmemek için connection wrapper içinde küçük bir flag tuttum.

public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
{
    await _gate.WaitAsync(cancellationToken);

    try
    {
        if (_connection is null || !_connection.IsOpen)
        {
            if (_connection is not null)
                await _connection.DisposeAsync();

            _connection = await _factory.CreateConnectionAsync(cancellationToken);
            _exchangeDeclared = false;
        }

        var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);

        if (!_exchangeDeclared)
        {
            await channel.ExchangeDeclareAsync(
                ExchangeName,
                ExchangeType.Direct,
                durable: false,
                autoDelete: false,
                cancellationToken: cancellationToken);

            _exchangeDeclared = true;
        }

        return channel;
    }
    finally
    {
        _gate.Release();
    }
}

subscription lifecycle

burada dikkat edilmesi gereken ikinci konu subscription lifecycle’dı.

bir room’a bağlı son client da bağlantısını kopardığında, o room için açılmış consumer ve queue’nun da kapanması gerekiyordu. aksi halde aktif olmayan room’lar için consumer’lar çalışmaya devam eder, broker üzerinde boş queue’lar birikirdi.

bunun için subscription’ı `IAsyncDisposable` olarak modelledim. son client bağlantısı koptuğunda subscription dispose ediliyor. channel kapanınca consumer da düşüyor, exclusive/auto-delete queue da broker tarafından temizleniyor.

public async ValueTask DisposeAsync()
{
    if (Interlocked.Exchange(ref _disposed, 1) == 1)
        return;

    await _registration.DisposeAsync();
    await _channel.DisposeAsync();
}

aynı pod içinde aynı room’a birden fazla client bağlanırsa her client için ayrı queue açmak istemedim. bunun yerine pod başına room bazlı tek subscription tuttum ve o room’a kaç client’ın bağlı olduğunu saydım.

son client da ayrıldığında subscription kapatılıyordu.

public async Task UnsubscribeAsync(string room)
{
    RoomSubscription? toDispose = null;

    await _gate.WaitAsync();

    try
    {
        if (_rooms.TryGetValue(room, out var sub) && sub.RemoveListener() == 0)
        {
            _rooms.Remove(room);
            toDispose = sub;
        }
    }
    finally
    {
        _gate.Release();
    }

    if (toDispose is not null)
        await toDispose.DisposeAsync();
}

burada dispose işlemini lock dışında yapmak önemli. çünkü channel kapatma async ve nispeten yavaş bir işlem olabilir. bunu lock altında bekletmek gereksiz yere diğer subscribe/unsubscribe işlemlerini bloklayabilir.

sonuç

bu poc sonunda native websocket ile başlayan basit yapı, çoklu pod üzerinde çalışabilecek room bazlı bir realtime broadcast yapısına dönüştü.

tek pod içinde websocket bağlantılarını memory’de room bazlı tuttum. pod’lar arası mesaj dağıtımı için rabbitmq direct exchange kullandım. room adını routing key olarak ele aldım. her pod kendi ilgilendiği room’lar için exclusive/auto-delete queue açtı ve son client ayrıldığında subscription’ı temizledi.

bu yapı sayesinde aynı room’a bağlı client’lar farklı pod’larda olsalar bile aynı mesajı alabilir hale geldi.

bu çalışma benim için özellikle native websocket yönetimi, rabbitmq routing, subscription lifecycle, multi-pod senaryoları ve realtime mesaj dağıtımı açısından iyi bir pratik oldu.