Building a Real-Time Notification System with WebSockets and Redis
Building scalable real-time features is one of the most challenging aspects of modern web development. In this guide, we'll walk through creating a notification system that can handle millions of concurrent connections while maintaining low latency.
Why WebSockets?
Traditional HTTP polling has significant drawbacks for real-time applications. Every poll request creates overhead, and you're always trading off between latency and server load. WebSockets solve this by maintaining a persistent, bidirectional connection.
"WebSockets reduce latency from seconds to milliseconds, transforming what's possible in web applications." ā High Performance Browser Networking
Here's a quick comparison:
Approach Latency Server Load Complexity HTTP Polling High (1-10s) Very High Low Long Polling Medium (0.5-2s) High Medium Server-Sent Events Low (50-200ms) Medium Medium WebSockets Very Low (<50ms) Low Higher
Prerequisites
Before we start, make sure you have the following installed:
- Node.js 20+ (download here)
- Redis 7+ (installation guide)
- Basic understanding of TypeScript
You can verify your installations with:
node --version # Should output v20.x.x or higher
redis-cli ping # Should output PONG
Project Setup
Let's initialize our project and install dependencies:
mkdir realtime-notifications
cd realtime-notifications
npm init -y
npm install ws redis typescript @types/ws @types/node tsx
Create a tsconfig.json with the following configuration:
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"strict": ,
"esModuleInterop": ,
"outDir": "./dist",
"rootDir": "./src"
},
"include": ["src/**/*"]
}
The WebSocket Server
Here's our main server implementation. Create src/server.ts:
//! Real-Time Notification Server
//! Rust implementation using tokio, tungstenite, and redis
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use futures_util::{SinkExt, StreamExt};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, RwLock};
use tokio_tungstenite::{accept_async, tungstenite::Message};
use uuid::Uuid;
// ============================================================================
// Types
// ============================================================================
type Clients = Arc<RwLock<HashMap<String, Client>>>;
type WsSender = mpsc::UnboundedSender<Message>;
struct Client {
sender: WsSender,
user_id: String,
subscribed_channels: HashSet<String>,
}
#[derive(Debug, Deserialize)]
#[serde(tag = , rename_all = )]
enum IncomingMessage {
Subscribe { channel: String },
Unsubscribe { channel: String },
}
#[derive(Debug, Serialize)]
#[serde(rename_all = )]
struct OutgoingMessage {
#[serde(rename = )]
msg_type: String,
#[serde(skip_serializing_if = )]
client_id: Option<String>,
#[serde(skip_serializing_if = )]
channel: Option<String>,
#[serde(skip_serializing_if = )]
data: Option<String>,
timestamp: u128,
}
// ============================================================================
// NotificationServer
// ============================================================================
struct NotificationServer {
port: u16,
clients: Clients,
}
impl NotificationServer {
fn new(port: u16) -> Self {
Self {
port,
clients: Arc::new(RwLock::new(HashMap::new())),
}
}
async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("0.0.0.0:{}", self.port);
let listener = TcpListener::bind(&addr).await?;
println!("š WebSocket server running on port {}", self.port);
// Spawn Redis pub/sub listener
let clients_clone = self.clients.clone();
tokio::spawn(async move {
if let Err(e) = listen_redis(clients_clone).await {
eprintln!("Redis listener error: {}", e);
}
});
// Accept WebSocket connections
while let Ok((stream, addr)) = listener.accept().await {
let clients = self.clients.clone();
tokio::spawn(handle_connection(stream, addr, clients));
}
Ok(())
}
}
// ============================================================================
// Connection Handling
// ============================================================================
async fn handle_connection(stream: TcpStream, addr: SocketAddr, clients: Clients) {
let ws_stream = match accept_async(stream).await {
Ok(ws) => ws,
Err(e) => {
eprintln!("WebSocket handshake failed: {}", e);
return;
}
};
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let client_id = Uuid::new_v4().to_string();
let user_id = extract_user_id(&addr); // Simplified - would parse from query params
// Register client
{
let client = Client {
sender: tx.clone(),
user_id: user_id.clone(),
subscribed_channels: HashSet::new(),
};
clients.write().await.insert(client_id.clone(), client);
}
println!("ā
Client connected: {} (User: {})", client_id, user_id);
// Send welcome message
let welcome = OutgoingMessage {
msg_type: "connected".to_string(),
client_id: Some(client_id.clone()),
channel: None,
data: None,
timestamp: timestamp(),
};
let _ = tx.send(Message::Text(serde_json::to_string(&welcome).unwrap().into()));
// Task to forward messages from channel to WebSocket
let send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if ws_sender.send(msg).await.is_err() {
break;
}
}
});
// Process incoming messages
let client_id_clone = client_id.clone();
let clients_clone = clients.clone();
while let Some(result) = ws_receiver.next().await {
match result {
Ok(Message::Text(text)) => {
handle_message(&client_id_clone, &text, &clients_clone).await;
}
Ok(Message::Close(_)) => break,
Err(_) => break,
_ => {}
}
}
// Cleanup
clients.write().await.remove(&client_id);
send_task.abort();
println!("ā Client disconnected: {}", client_id);
}
async fn handle_message(client_id: &str, raw: &str, clients: &Clients) {
let message: IncomingMessage = match serde_json::from_str(raw) {
Ok(msg) => msg,
Err(e) => {
eprintln!("Failed to parse message: {}", e);
return;
}
};
let mut clients_guard = clients.write().await;
let client = match clients_guard.get_mut(client_id) {
Some(c) => c,
None => return,
};
match message {
IncomingMessage::Subscribe { channel } => {
client.subscribed_channels.insert(channel.clone());
let response = OutgoingMessage {
msg_type: "subscribed".to_string(),
client_id: None,
channel: Some(channel.clone()),
data: None,
timestamp: timestamp(),
};
let _ = client.sender.send(Message::Text(
serde_json::to_string(&response).unwrap().into()
));
println!("š¢ Client {} subscribed to: {}", client_id, channel);
}
IncomingMessage::Unsubscribe { channel } => {
client.subscribed_channels.remove(&channel);
let response = OutgoingMessage {
msg_type: "unsubscribed".to_string(),
client_id: None,
channel: Some(channel.clone()),
data: None,
timestamp: timestamp(),
};
let _ = client.sender.send(Message::Text(
serde_json::to_string(&response).unwrap().into()
));
println!("š Client {} unsubscribed from: {}", client_id, channel);
}
}
}
// ============================================================================
// Redis Pub/Sub
// ============================================================================
async fn listen_redis(clients: Clients) -> Result<(), Box<dyn std::error::Error>> {
let redis_client = redis::Client::open("redis://127.0.0.1/")?;
let mut pubsub = redis_client.get_async_pubsub().await?;
pubsub.psubscribe("notifications:*").await?;
println!("š” Subscribed to Redis notifications:*");
let mut stream = pubsub.on_message();
while let Some(msg) = stream.next().await {
let channel: String = msg.get_channel()?;
let payload: String = msg.get_payload()?;
broadcast_to_channel(&clients, &channel, &payload).await;
}
Ok(())
}
async fn broadcast_to_channel(clients: &Clients, channel: &str, data: &str) {
let clients_guard = clients.read().await;
for client in clients_guard.values() {
if client.subscribed_channels.contains(channel) {
let notification = OutgoingMessage {
msg_type: "notification".to_string(),
client_id: None,
channel: Some(channel.to_string()),
data: Some(data.to_string()),
timestamp: timestamp(),
};
let _ = client.sender.send(Message::Text(
serde_json::to_string(¬ification).unwrap().into()
));
}
}
}
// ============================================================================
// Utilities
// ============================================================================
fn extract_user_id(addr: &SocketAddr) -> String {
// Simplified - in production, parse from WebSocket upgrade request query params
format!("user_{}", addr.port())
}
fn timestamp() -> u128 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
}
// ============================================================================
// Main
// ============================================================================
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = NotificationServer::new(8080);
server.start().await
}Key Design Decisions
Notice a few important patterns in the code above:
- Client tracking uses a
Mapfor O(1) lookups - Redis pub/sub enables horizontal scaling across multiple server instances
- Channel-based subscriptions allow fine-grained notification targeting
The Client Implementation
For the frontend, we'll create a reconnecting WebSocket wrapper:
// client/NotificationClient.ts
type MessageHandler = () => void;
export class NotificationClient {
private ws: WebSocket | null = null;
private url: string;
private handlers: Map<string, Set<MessageHandler>> = new Map();
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 1000;
constructor(: ) {
this.url = url;
this.connect();
}
private connect(): void {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('Connected to notification server');
this.reconnectAttempts = 0;
};
this.ws.onmessage = () => {
const data = JSON.parse(event.data);
this.emit(data.type, data);
};
this.ws.onclose = () => {
this.scheduleReconnect();
};
}
private scheduleReconnect(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnection attempts reached');
return;
}
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
this.reconnectAttempts++;
console.log(`Reconnecting in ms...`);
setTimeout(() => this.connect(), delay);
}
on(event: string, handler: MessageHandler): () => void {
if (!this.handlers.has(event)) {
this.handlers.set(event, new Set());
}
this.handlers.get(event)!.add(handler);
// Return unsubscribe function
return () => this.handlers.get(event)?.delete(handler);
}
private emit(event: string, data: any): void {
this.handlers.get(event)?.forEach( => handler(data));
}
subscribe(channel: string): void {
this.send({ type: 'subscribe', channel });
}
private send(data: object): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
}
Usage in a React component:
import { useEffect, useState } from 'react';
import { NotificationClient } from './NotificationClient';
export function NotificationBell() {
const [notifications, setNotifications] = useState<Notification[]>([]);
const [unreadCount, setUnreadCount] = useState(0);
useEffect(() => {
const client = new NotificationClient('wss://api.example.com/ws');
client.subscribe('user:123');
const unsubscribe = client.on('notification', () => {
setNotifications( => [data.notification, ...prev]);
setUnreadCount( => prev + 1);
});
return () => {
unsubscribe();
};
}, []);
return (
{unreadCount > 0 && (
{unreadCount}
)}
);
}
Load Testing
Before deploying, let's verify our system can handle the load. Here's a simple load test script:
// loadtest.ts
import WebSocket from 'ws';
const NUM_CLIENTS = 10000;
const SERVER_URL = 'ws://localhost:8080';
async function runLoadTest() {
const connections: WebSocket[] = [];
const startTime = Date.now();
console.log(`Starting load test with clients...`);
for (let i = 0; i < NUM_CLIENTS; i++) {
const ws = new WebSocket(SERVER_URL);
ws.on('open', () => {
ws.send(JSON.stringify({
type: 'subscribe',
channel: `user:`
}));
});
connections.push(ws);
// Stagger connections to avoid thundering herd
if (i % 100 === 0) {
await new Promise( => setTimeout(r, 10));
console.log(`Connected clients...`);
}
}
const elapsed = Date.now() - startTime;
console.log(`ā
All clients connected in ms`);
// Cleanup
setTimeout(() => {
connections.forEach( => ws.close());
process.exit(0);
}, 5000);
}
runLoadTest();
Run it with:
npx tsx loadtest.tsYou should see output like:
Starting load test with 10000 clients...
Connected 100 clients...
Connected 200 clients...
...
ā
All 10000 clients connected in 2847ms
Deployment Considerations
When deploying to production, keep these factors in mind:
Horizontal Scaling
Since WebSocket connections are stateful, you'll need a load balancer that supports sticky sessions. Here's an example nginx configuration:
upstream websocket_servers {
ip_hash; # Sticky sessions based on client IP
server ws1.internal:8080;
server ws2.internal:8080;
server ws3.internal:8080;
}
server {
listen 443 ssl;
server_name ws.example.com;
location / {
proxy_pass http://websocket_servers;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_read_timeout 86400;
}
}
Monitoring
Add these metrics to your monitoring stack:
ws_connections_totalā Total active connectionsws_messages_sent_totalā Messages sent per secondws_message_latency_msā End-to-end message latencyredis_pubsub_messagesā Redis pub/sub throughput
Conclusion
We've built a scalable notification system that can:
- Handle thousands of concurrent connections per server
- Scale horizontally with Redis pub/sub
- Automatically reconnect on failure
- Target notifications to specific channels/users
The full source code is available on GitHub.
Further Reading
Have questions or feedback? Reach out on Twitter or leave a comment below.
<!-- Test HTML elements --> <details> <summary>Click to see the complete file structure</summary>
realtime-notifications/
āāā src/
ā āāā server.ts
ā āāā client/
ā ā āāā NotificationClient.ts
ā āāā types/
ā āāā index.ts
āāā package.json
āāā tsconfig.json
āāā README.md
</details>
Written by Ahmed Fathy
Author at NextGen Softwares