Back to Blog

Building a Real-Time Notification System with WebSockets and Redis | Tutorial

AF

Ahmed Fathy

5 min read
Building a Real-Time Notification System with WebSockets and Redis | Tutorial

Building a Real-Time Notification System with WebSockets and Redis

Building scalable real-time features is one of the most challenging aspects of modern web development. In this guide, we'll walk through creating a notification system that can handle millions of concurrent connections while maintaining low latency.

Why WebSockets?

Traditional HTTP polling has significant drawbacks for real-time applications. Every poll request creates overhead, and you're always trading off between latency and server load. WebSockets solve this by maintaining a persistent, bidirectional connection.

"WebSockets reduce latency from seconds to milliseconds, transforming what's possible in web applications." — High Performance Browser Networking

Here's a quick comparison:

Approach Latency Server Load Complexity HTTP Polling High (1-10s) Very High Low Long Polling Medium (0.5-2s) High Medium Server-Sent Events Low (50-200ms) Medium Medium WebSockets Very Low (<50ms) Low Higher

Prerequisites

Before we start, make sure you have the following installed:

  1. Node.js 20+ (download here)
  2. Redis 7+ (installation guide)
  3. Basic understanding of TypeScript

You can verify your installations with:

node --version    # Should output v20.x.x or higher
redis-cli ping    # Should output PONG

Project Setup

Let's initialize our project and install dependencies:

mkdir realtime-notifications
cd realtime-notifications
npm init -y
npm install ws redis typescript @types/ws @types/node tsx

Create a tsconfig.json with the following configuration:

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "NodeNext",
    "moduleResolution": "NodeNext",
    "strict": ,
    "esModuleInterop": ,
    "outDir": "./dist",
    "rootDir": "./src"
  },
  "include": ["src/**/*"]
}

The WebSocket Server

Here's our main server implementation. Create src/server.ts:

//! Real-Time Notification Server
//! Rust implementation using tokio, tungstenite, and redis

use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;

use futures_util::{SinkExt, StreamExt};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, RwLock};
use tokio_tungstenite::{accept_async, tungstenite::Message};
use uuid::Uuid;

// ============================================================================
// Types
// ============================================================================

type Clients = Arc<RwLock<HashMap<String, Client>>>;
type WsSender = mpsc::UnboundedSender<Message>;

struct Client {
    sender: WsSender,
    user_id: String,
    subscribed_channels: HashSet<String>,
}

#[derive(Debug, Deserialize)]
#[serde(tag = , rename_all = )]
enum IncomingMessage {
    Subscribe { channel: String },
    Unsubscribe { channel: String },
}

#[derive(Debug, Serialize)]
#[serde(rename_all = )]
struct OutgoingMessage {
    #[serde(rename = )]
    msg_type: String,
    #[serde(skip_serializing_if = )]
    client_id: Option<String>,
    #[serde(skip_serializing_if = )]
    channel: Option<String>,
    #[serde(skip_serializing_if = )]
    data: Option<String>,
    timestamp: u128,
}

// ============================================================================
// NotificationServer
// ============================================================================

struct NotificationServer {
    port: u16,
    clients: Clients,
}

impl NotificationServer {
    fn new(port: u16) -> Self {
        Self {
            port,
            clients: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
        let addr = format!("0.0.0.0:{}", self.port);
        let listener = TcpListener::bind(&addr).await?;
        
        println!("šŸš€ WebSocket server running on port {}", self.port);

        // Spawn Redis pub/sub listener
        let clients_clone = self.clients.clone();
        tokio::spawn(async move {
            if let Err(e) = listen_redis(clients_clone).await {
                eprintln!("Redis listener error: {}", e);
            }
        });

        // Accept WebSocket connections
        while let Ok((stream, addr)) = listener.accept().await {
            let clients = self.clients.clone();
            tokio::spawn(handle_connection(stream, addr, clients));
        }

        Ok(())
    }
}

// ============================================================================
// Connection Handling
// ============================================================================

async fn handle_connection(stream: TcpStream, addr: SocketAddr, clients: Clients) {
    let ws_stream = match accept_async(stream).await {
        Ok(ws) => ws,
        Err(e) => {
            eprintln!("WebSocket handshake failed: {}", e);
            return;
        }
    };

    let (mut ws_sender, mut ws_receiver) = ws_stream.split();
    let (tx, mut rx) = mpsc::unbounded_channel::<Message>();

    let client_id = Uuid::new_v4().to_string();
    let user_id = extract_user_id(&addr); // Simplified - would parse from query params

    // Register client
    {
        let client = Client {
            sender: tx.clone(),
            user_id: user_id.clone(),
            subscribed_channels: HashSet::new(),
        };
        clients.write().await.insert(client_id.clone(), client);
    }

    println!("āœ… Client connected: {} (User: {})", client_id, user_id);

    // Send welcome message
    let welcome = OutgoingMessage {
        msg_type: "connected".to_string(),
        client_id: Some(client_id.clone()),
        channel: None,
        data: None,
        timestamp: timestamp(),
    };
    let _ = tx.send(Message::Text(serde_json::to_string(&welcome).unwrap().into()));

    // Task to forward messages from channel to WebSocket
    let send_task = tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            if ws_sender.send(msg).await.is_err() {
                break;
            }
        }
    });

    // Process incoming messages
    let client_id_clone = client_id.clone();
    let clients_clone = clients.clone();
    
    while let Some(result) = ws_receiver.next().await {
        match result {
            Ok(Message::Text(text)) => {
                handle_message(&client_id_clone, &text, &clients_clone).await;
            }
            Ok(Message::Close(_)) => break,
            Err(_) => break,
            _ => {}
        }
    }

    // Cleanup
    clients.write().await.remove(&client_id);
    send_task.abort();
    println!("āŒ Client disconnected: {}", client_id);
}

async fn handle_message(client_id: &str, raw: &str, clients: &Clients) {
    let message: IncomingMessage = match serde_json::from_str(raw) {
        Ok(msg) => msg,
        Err(e) => {
            eprintln!("Failed to parse message: {}", e);
            return;
        }
    };

    let mut clients_guard = clients.write().await;
    let client = match clients_guard.get_mut(client_id) {
        Some(c) => c,
        None => return,
    };

    match message {
        IncomingMessage::Subscribe { channel } => {
            client.subscribed_channels.insert(channel.clone());
            
            let response = OutgoingMessage {
                msg_type: "subscribed".to_string(),
                client_id: None,
                channel: Some(channel.clone()),
                data: None,
                timestamp: timestamp(),
            };
            let _ = client.sender.send(Message::Text(
                serde_json::to_string(&response).unwrap().into()
            ));
            println!("šŸ“¢ Client {} subscribed to: {}", client_id, channel);
        }
        IncomingMessage::Unsubscribe { channel } => {
            client.subscribed_channels.remove(&channel);
            
            let response = OutgoingMessage {
                msg_type: "unsubscribed".to_string(),
                client_id: None,
                channel: Some(channel.clone()),
                data: None,
                timestamp: timestamp(),
            };
            let _ = client.sender.send(Message::Text(
                serde_json::to_string(&response).unwrap().into()
            ));
            println!("šŸ”• Client {} unsubscribed from: {}", client_id, channel);
        }
    }
}

// ============================================================================
// Redis Pub/Sub
// ============================================================================

async fn listen_redis(clients: Clients) -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1/")?;
    let mut pubsub = redis_client.get_async_pubsub().await?;
    
    pubsub.psubscribe("notifications:*").await?;
    println!("šŸ“” Subscribed to Redis notifications:*");

    let mut stream = pubsub.on_message();
    
    while let Some(msg) = stream.next().await {
        let channel: String = msg.get_channel()?;
        let payload: String = msg.get_payload()?;
        
        broadcast_to_channel(&clients, &channel, &payload).await;
    }

    Ok(())
}

async fn broadcast_to_channel(clients: &Clients, channel: &str, data: &str) {
    let clients_guard = clients.read().await;
    
    for client in clients_guard.values() {
        if client.subscribed_channels.contains(channel) {
            let notification = OutgoingMessage {
                msg_type: "notification".to_string(),
                client_id: None,
                channel: Some(channel.to_string()),
                data: Some(data.to_string()),
                timestamp: timestamp(),
            };
            let _ = client.sender.send(Message::Text(
                serde_json::to_string(&notification).unwrap().into()
            ));
        }
    }
}

// ============================================================================
// Utilities
// ============================================================================

fn extract_user_id(addr: &SocketAddr) -> String {
    // Simplified - in production, parse from WebSocket upgrade request query params
    format!("user_{}", addr.port())
}

fn timestamp() -> u128 {
    use std::time::{SystemTime, UNIX_EPOCH};
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_millis()
}

// ============================================================================
// Main
// ============================================================================

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = NotificationServer::new(8080);
    server.start().await
}

Key Design Decisions

Notice a few important patterns in the code above:

  • Client tracking uses a Map for O(1) lookups
  • Redis pub/sub enables horizontal scaling across multiple server instances
  • Channel-based subscriptions allow fine-grained notification targeting

The Client Implementation

For the frontend, we'll create a reconnecting WebSocket wrapper:

// client/NotificationClient.ts

type MessageHandler = () => void;

export class NotificationClient {
  private ws: WebSocket | null = null;
  private url: string;
  private handlers: Map<string, Set<MessageHandler>> = new Map();
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  private reconnectDelay = 1000;

  constructor(: ) {
    this.url = url;
    this.connect();
  }

  private connect(): void {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('Connected to notification server');
      this.reconnectAttempts = 0;
    };

    this.ws.onmessage = () => {
      const data = JSON.parse(event.data);
      this.emit(data.type, data);
    };

    this.ws.onclose = () => {
      this.scheduleReconnect();
    };
  }

  private scheduleReconnect(): void {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('Max reconnection attempts reached');
      return;
    }

    const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
    this.reconnectAttempts++;
    
    console.log(`Reconnecting in ms...`);
    setTimeout(() => this.connect(), delay);
  }

  on(event: string, handler: MessageHandler): () => void {
    if (!this.handlers.has(event)) {
      this.handlers.set(event, new Set());
    }
    this.handlers.get(event)!.add(handler);
    
    // Return unsubscribe function
    return () => this.handlers.get(event)?.delete(handler);
  }

  private emit(event: string, data: any): void {
    this.handlers.get(event)?.forEach( => handler(data));
  }

  subscribe(channel: string): void {
    this.send({ type: 'subscribe', channel });
  }

  private send(data: object): void {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(data));
    }
  }
}

Usage in a React component:

import { useEffect, useState } from 'react';
import { NotificationClient } from './NotificationClient';

export function NotificationBell() {
  const [notifications, setNotifications] = useState<Notification[]>([]);
  const [unreadCount, setUnreadCount] = useState(0);

  useEffect(() => {
    const client = new NotificationClient('wss://api.example.com/ws');
    
    client.subscribe('user:123');
    
    const unsubscribe = client.on('notification', () => {
      setNotifications( => [data.notification, ...prev]);
      setUnreadCount( => prev + 1);
    });

    return () => {
      unsubscribe();
    };
  }, []);

  return (
    
      
      {unreadCount > 0 && (
        {unreadCount}
      )}
    
  );
}

Load Testing

Before deploying, let's verify our system can handle the load. Here's a simple load test script:

// loadtest.ts
import WebSocket from 'ws';

const NUM_CLIENTS = 10000;
const SERVER_URL = 'ws://localhost:8080';

async function runLoadTest() {
  const connections: WebSocket[] = [];
  const startTime = Date.now();

  console.log(`Starting load test with  clients...`);

  for (let i = 0; i < NUM_CLIENTS; i++) {
    const ws = new WebSocket(SERVER_URL);
    
    ws.on('open', () => {
      ws.send(JSON.stringify({ 
        type: 'subscribe', 
        channel: `user:` 
      }));
    });

    connections.push(ws);

    // Stagger connections to avoid thundering herd
    if (i % 100 === 0) {
      await new Promise( => setTimeout(r, 10));
      console.log(`Connected  clients...`);
    }
  }

  const elapsed = Date.now() - startTime;
  console.log(`āœ… All  clients connected in ms`);

  // Cleanup
  setTimeout(() => {
    connections.forEach( => ws.close());
    process.exit(0);
  }, 5000);
}

runLoadTest();

Run it with:

npx tsx loadtest.ts

You should see output like:

Starting load test with 10000 clients...
Connected 100 clients...
Connected 200 clients...
...
āœ… All 10000 clients connected in 2847ms

Deployment Considerations

When deploying to production, keep these factors in mind:

Horizontal Scaling

Since WebSocket connections are stateful, you'll need a load balancer that supports sticky sessions. Here's an example nginx configuration:

upstream websocket_servers {
    ip_hash;  # Sticky sessions based on client IP
    server ws1.internal:8080;
    server ws2.internal:8080;
    server ws3.internal:8080;
}

server {
    listen 443 ssl;
    server_name ws.example.com;

    location / {
        proxy_pass http://websocket_servers;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_read_timeout 86400;
    }
}

Monitoring

Add these metrics to your monitoring stack:

  • ws_connections_total — Total active connections
  • ws_messages_sent_total — Messages sent per second
  • ws_message_latency_ms — End-to-end message latency
  • redis_pubsub_messages — Redis pub/sub throughput

Conclusion

We've built a scalable notification system that can:

  • Handle thousands of concurrent connections per server
  • Scale horizontally with Redis pub/sub
  • Automatically reconnect on failure
  • Target notifications to specific channels/users

The full source code is available on GitHub.


Further Reading


Have questions or feedback? Reach out on Twitter or leave a comment below.

<!-- Test HTML elements --> <details> <summary>Click to see the complete file structure</summary>

realtime-notifications/
ā”œā”€ā”€ src/
│   ā”œā”€ā”€ server.ts
│   ā”œā”€ā”€ client/
│   │   └── NotificationClient.ts
│   └── types/
│       └── index.ts
ā”œā”€ā”€ package.json
ā”œā”€ā”€ tsconfig.json
└── README.md

</details>

Share this article:
AF

Written by Ahmed Fathy

Author at NextGen Softwares

Comments

Leave a Comment

Have a Project in Mind?

Let's discuss how we can help bring your ideas to life.