building web socket pub sub structure with rabbitmq
some time ago, i was working on a project where electric vehicle charging devices were managed. at some point, we were going to discuss how users could see their active charging information in real time.
i had not built a realtime broadcast structure with websocket + rabbitmq in a multi-pod environment before. so i wanted to build a small poc first. when the topic came up, it would be useful to have a working example and say “we can solve it like this.”
there were a few ways to solve this problem. the mobile app could send http requests at regular intervals, we could use signalr, or we could open a native websocket connection.
for this example, i decided to use native websocket. signalr was also a reasonable alternative, especially because it makes things like group management easier on the backend side. but for this scenario, i wanted to keep the structure lighter. with native websocket, i felt like i had more control over connection management, room/channel logic, and message publishing.
first version: room management in memory
in the first version, i built a very simple websocket structure. the client connected, realtime messages were coming in, and the user experience looked good.
but when more than one client connected to the same charge point id, basically the same channel, the problem appeared. when a new connection came in, the old connection was being dropped.
the actual need was closer to a chatroom structure. multiple clients should be able to connect to the same room, and everyone in that room should receive the incoming message.
for this, i built a room-based structure in memory. i used nested `ConcurrentDictionary` objects to keep the room name and the websocket connections connected to that room.
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, WebSocket>> _rooms = new();on each websocket connection, a new socket was added to the related room.
public void AddSocket(string room, string id, WebSocket socket)
{
var roomSockets = _rooms.GetOrAdd(room, _ => new ConcurrentDictionary<string, WebSocket>());
roomSockets.TryAdd(id, socket);
}when a connection was closed, the socket had to be removed from the room. if the room became completely empty, it also had to be removed from the outer dictionary.
public async Task RemoveSocket(string room, string id)
{
if (!_rooms.TryGetValue(room, out var roomSockets) || !roomSockets.TryRemove(id, out var socket))
return;
if (socket.State == WebSocketState.Open)
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closed by manager", CancellationToken.None);
if (roomSockets.IsEmpty)
_rooms.TryRemove(room, out _);
}on the broadcast side, the message was sent to all sockets in the related room, and closed or failed sockets were cleaned up.
public async Task BroadcastMessage(string room, string message)
{
if (!_rooms.TryGetValue(room, out var roomSockets))
return;
var buffer = Encoding.UTF8.GetBytes(message);
var tasks = roomSockets.Select(async pair =>
{
var socket = pair.Value;
if (socket.State != WebSocketState.Open)
{
await RemoveSocket(room, pair.Key);
return;
}
try
{
await socket.SendAsync(
new ArraySegment<byte>(buffer),
WebSocketMessageType.Text,
true,
CancellationToken.None);
}
catch (WebSocketException)
{
await RemoveSocket(room, pair.Key);
}
});
await Task.WhenAll(tasks);
}this structure worked on a single pod. connections were kept in memory, and the manager class was running as a singleton. but i knew that the application would not run as a single pod in production.
when i started the application with multiple pods, the expected problem appeared: a message received by one pod was not reaching websocket connections on the other pod.
so the room structure i kept in memory worked inside a pod, but it had no meaning between pods.
the multi-pod problem
the need was simple: when a message came to a room, it had to be delivered to all websocket clients connected to that room, no matter which pod they were connected to.
for that, i needed a pub/sub-style structure to distribute messages between pods.
there were two reasonable options: redis pub/sub or rabbitmq.
i considered redis first. but in the project, we had decided to use redis mostly for distributed cache and fast data access. so it made more sense to use rabbitmq for message distribution.
rabbitmq side: single exchange, room-based routing key
the first idea was to create a separate fanout exchange for each room. but this was not a structure that would scale well.
as the number of rooms increased, the number of exchanges on the broker would also increase. also, declaring an exchange before every publish would mean an unnecessary round-trip on the hot path.
instead, i used a single direct exchange. the exchange name was fixed, and the room name was used as the routing key.
the logic was like this:
each pod creates its own exclusive and auto-delete queue for the room it is interested in. then it binds this queue to the direct exchange with `routingKey = room`.
so when a message is published to a room, the message goes to the queues of all pods subscribed to that room. if the pod shuts down or the subscription is disposed, the queue is also cleaned up automatically on the broker side.
to avoid declaring the exchange every time, i kept a small flag inside the connection wrapper.
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
{
await _gate.WaitAsync(cancellationToken);
try
{
if (_connection is null || !_connection.IsOpen)
{
if (_connection is not null)
await _connection.DisposeAsync();
_connection = await _factory.CreateConnectionAsync(cancellationToken);
_exchangeDeclared = false;
}
var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
if (!_exchangeDeclared)
{
await channel.ExchangeDeclareAsync(
ExchangeName,
ExchangeType.Direct,
durable: false,
autoDelete: false,
cancellationToken: cancellationToken);
_exchangeDeclared = true;
}
return channel;
}
finally
{
_gate.Release();
}
}subscription lifecycle
the second important part was subscription lifecycle.
when the last client connected to a room disconnected, the consumer and queue created for that room also had to be closed. otherwise, consumers for inactive rooms would keep running, and empty queues would start accumulating on the broker.
to handle this, i modeled the subscription as `IAsyncDisposable`. when the last client disconnects, the subscription is disposed. when the channel is closed, the consumer is also removed, and the exclusive/auto-delete queue is cleaned up by the broker.
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _disposed, 1) == 1)
return;
await _registration.DisposeAsync();
await _channel.DisposeAsync();
}if multiple clients connect to the same room inside the same pod, i did not want to create a separate queue for every client. instead, i kept one room subscription per pod and counted how many clients were connected to that room.
when the last client left, the subscription was closed.
public async Task UnsubscribeAsync(string room)
{
RoomSubscription? toDispose = null;
await _gate.WaitAsync();
try
{
if (_rooms.TryGetValue(room, out var sub) && sub.RemoveListener() == 0)
{
_rooms.Remove(room);
toDispose = sub;
}
}
finally
{
_gate.Release();
}
if (toDispose is not null)
await toDispose.DisposeAsync();
}one detail here is important: the dispose operation runs outside the lock. closing a channel is async and can be relatively slow. keeping that inside the lock could block other subscribe/unsubscribe operations for no good reason.
result
at the end of this poc, the simple native websocket structure turned into a room-based realtime broadcast setup that could work across multiple pods.
inside a single pod, i kept websocket connections in memory by room. for message distribution between pods, i used rabbitmq direct exchange. i used the room name as the routing key. each pod created its own exclusive/auto-delete queue for the rooms it was interested in, and when the last client left, the subscription was cleaned up.
with this structure, clients connected to the same room could receive the same message even if they were connected to different pods.
this was a useful practice for me, especially around native websocket management, rabbitmq routing, subscription lifecycle, multi-pod scenarios, and realtime message distribution.
repo is here
...