Real-time chat moderation is harder than it looks.

You’re expected to inspect every message, block abusive users instantly, rate-limit spam, and still keep latency low enough that users don’t feel it. The challenge isn’t just detecting bad messages — it’s doing all of this in real time, across multiple users, without slowing the system down.

In this article, I’ll walk through how I built ChatWarden, a real-time chat moderation system optimized for low latency and horizontal scalability.


Tech Stack Overview

The system is built entirely with free, open-source tools:

  • Python as the core language
  • FastAPI for WebSocket handling
  • Redis Pub/Sub for instance message broadcasting
  • Redis caching for bans and rate limits
  • Redis Cuckoo Filter for constant-time blocked user checks
  • PostgreSQL for persistent ban records
  • Pandas + Joblib for training and exporting the moderation model

System Design Overview

At a high level, the system is event-driven and WebSocket-based.

Each connected user maintains a persistent WebSocket connection to the server. The server keeps track of all active connections in memory, allowing it to push messages to clients instantly without polling or repeated HTTP requests.

However, WebSockets alone are not enough.

Because the application is designed to scale horizontally, multiple FastAPI instances may be running at the same time. A message sent to one server instance must still be visible to users connected to other instances. This is where Redis Pub/Sub comes in.


How Messages Are Shared Between Users

When a user sends a message:

  1. The WebSocket server receives the message.
  2. The message is first checked by the moderation engine.
  3. If the message is allowed, it is published to a Redis Pub/Sub channel.
  4. Every FastAPI instance subscribed to that channel receives the message.
  5. Each instance broadcasts the message to all WebSocket connections it currently holds in memory.

This design ensures that all users, regardless of which server instance they’re connected to, see the same chat messages in real time.

Redis acts as the synchronization layer, while WebSockets handle low-latency message delivery.

Here is how we handle the Redis Pub/Sub mechanism:

import json
import logging
from redis.asyncio import Redis
from typing import List, Dict

logger = logging.getLogger("uvicorn")

class RedisManager:
    def __init__(self, redis_client: Redis):
        self.redis = redis_client
        self.channel = "chat_global"  
        self.history_key = "chat_history"
        self.history_limit = 50

    async def publish(self, message: dict):
        try:
            await self._add_to_history(message)
            
            await self.redis.publish(self.channel, json.dumps(message))
        except Exception as e:
            logger.error(f"Redis Publish Error: {e}")

    async def subscribe(self):
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(self.channel)
        return pubsub

    async def _add_to_history(self, message: dict):
        try:
            val = json.dumps(message)
            async with self.redis.pipeline(transaction=True) as pipe:
                pipe.lpush(self.history_key, val)
                pipe.ltrim(self.history_key, 0, self.history_limit - 1)
                await pipe.execute()
        except Exception as e:
            logger.error(f"Redis History Error: {e}")

    async def get_recent_messages(self) -> List[Dict]:
        try:
            raw_msgs = await self.redis.lrange(self.history_key, 0, -1)
            return [json.loads(m) for m in reversed(raw_msgs)]
        except Exception as e:
            logger.error(f"Get History Error: {e}")
            return []
package pubsub

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	"github.com/go-redis/redis/v8"
)

type RedisManager struct {
	client       *redis.Client
	channel      string
	historyKey   string
	historyLimit int64
}

func NewRedisManager(client *redis.Client) *RedisManager {
	return &RedisManager{
		client:       client,
		channel:      "chat_global",
		historyKey:   "chat_history",
		historyLimit: 50,
	}
}

func (r *RedisManager) Publish(ctx context.Context, message map[string]interface{}) error {
	// Add to history
	val, err := json.Marshal(message)
	if err != nil {
		return fmt.Errorf("marshal error: %w", err)
	}

	pipe := r.client.TxPipeline()
	pipe.LPush(ctx, r.historyKey, val)
	pipe.LTrim(ctx, r.historyKey, 0, r.historyLimit-1)
	_, err = pipe.Exec(ctx)
	if err != nil {
		log.Printf("Redis History Error: %v", err)
	}

	// Publish to channel
	return r.client.Publish(ctx, r.channel, val).Err()
}

func (r *RedisManager) Subscribe(ctx context.Context) *redis.PubSub {
	return r.client.Subscribe(ctx, r.channel)
}

func (r *RedisManager) GetRecentMessages(ctx context.Context) ([]map[string]interface{}, error) {
	rawMsgs, err := r.client.LRange(ctx, r.historyKey, 0, -1).Result()
	if err != nil {
		return nil, err
	}

	var messages []map[string]interface{}
	// Reverse order to show oldest first
	for i := len(rawMsgs) - 1; i >= 0; i-- {
		var msg map[string]interface{}
		if err := json.Unmarshal([]byte(rawMsgs[i]), &msg); err == nil {
			messages = append(messages, msg)
		}
	}
	return messages, nil
}
import { Redis } from "ioredis";

export class RedisManager {
  private redis: Redis;
  private channel = "chat_global";
  private historyKey = "chat_history";
  private historyLimit = 50;

  constructor(redisClient: Redis) {
    this.redis = redisClient;
  }

  async publish(message: Record<string, any>): Promise<void> {
    try {
      const val = JSON.stringify(message);
      
      const pipeline = this.redis.pipeline();
      pipeline.lpush(this.historyKey, val);
      pipeline.ltrim(this.historyKey, 0, this.historyLimit - 1);
      await pipeline.exec();

      await this.redis.publish(this.channel, val);
    } catch (error) {
      console.error("Redis Publish Error:", error);
    }
  }

  async subscribe() {
    // Note: In ioredis you usually need a duplicate connection for subscribing
    const subscriber = this.redis.duplicate();
    await subscriber.subscribe(this.channel);
    return subscriber;
  }

  async getRecentMessages(): Promise<Record<string, any>[]> {
    try {
      const rawMsgs = await this.redis.lrange(this.historyKey, 0, -1);
      // Reverse to show chronologically
      return rawMsgs.reverse().map((m) => JSON.parse(m));
    } catch (error) {
      console.error("Get History Error:", error);
      return [];
    }
  }
}

Active Connection Management

Each FastAPI instance maintains a simple in-memory list of active WebSocket connections. When a user connects, their WebSocket is added to the list. When they disconnect, it’s removed.

Broadcasting a message is as simple as iterating over this list and sending the message to each connected client. This keeps the delivery path fast and predictable.

A FastAPI instance refers to a single running process of the application. Since instances do not share memory, Redis is used to synchronize messages across them.

User A sends a messageInstance 1 receives itInstance 1 publishes the message to Redis Pub/Sub channelAll subscribed instances (Instance 1, Instance 2, …) get the messageEach instance iterates through its own active_connections list (which is in memory) ↓ Each connected WebSocket gets the message

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        if websocket in self.active_connections:
            self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception:
                pass

Why This Approach Works

  • Low latency: WebSockets avoid repeated HTTP requests.
  • Scalability: Redis Pub/Sub synchronizes multiple server instances.
  • Fault isolation: Each server instance is stateless.
  • Performance: Redis handles all ephemeral state efficiently.

Rate Limiting with Sliding Window

Blocking bad words is one part of moderation, but preventing spam is just as important. Users can be creative — sending messages rapidly, repeating toxic content, or flooding the chat. To handle this, I implemented a sliding window rate limiter using Redis.

How It Works

Think of it like a timeline for each user:

  1. Every time a user sends a message, we record the timestamp in Redis.
  2. Redis stores these timestamps in a Sorted Set (ZSET). Each ZSET is unique per user, e.g., violation_window:user123.
  3. The score in the ZSET is the timestamp, and the value can be a unique ID for that message.

Because ZSETs are automatically sorted by score, we can efficiently see how many messages a user has sent in the recent time window.

Enforcing the Sliding Window

  • Every new message triggers two operations:
    1. Remove old messages that are outside the sliding window (e.g., older than 10 seconds) using ZREMRANGEBYSCORE.
    2. Count remaining messages with ZCARD.
  • If the count exceeds the limit (for example, more than 5 messages in 10 seconds), the user is considered spamming.
  • Repeated offenses can trigger warnings or automatic bans, which are immediately recorded in Redis and optionally in Postgres for persistence.

Why Redis ZSET Makes This Fast

  • Operations are O(log N), so even with thousands of users, it’s lightning fast.
  • Each user has their own key, so checking one user doesn’t block others.
  • Combined with WebSockets, this ensures real-time enforcement, keeping the chat clean and low-latency.
import time
import uuid
import logging
from redis.asyncio import Redis
from ..config import Config

logger = logging.getLogger("uvicorn")

class SlidingWindowLimiter:
    def __init__(self, redis_client: Redis):
        self.redis = redis_client
        self.window_seconds = Config.RATE_LIMIT_WINDOW
        self.max_offenses = Config.RATE_LIMIT_MAX_OFFENSES

    async def record_violation(self, user_id: str) -> bool:
        key = f"violation_window:{user_id}"
        now = time.time()
        cutoff = now - self.window_seconds

        try:
            async with self.redis.pipeline(transaction=True) as pipe:
                pipe.zremrangebyscore(key, 0, cutoff)
                # Store unique event
                pipe.zadd(key, {str(uuid.uuid4()): now})
                # Set expiry to clean up old keys
                pipe.expire(key, self.window_seconds + 60)
                # Count current violations
                pipe.zcard(key)
                
                results = await pipe.execute()
                
            total_offenses = results[3]
            
            logger.info(f"User {user_id} violations: {total_offenses}/{self.max_offenses}")
            
            return total_offenses >= self.max_offenses

        except Exception as e:
            logger.error(f"Rate limit error: {e}")
            return False
package ratelimit

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/go-redis/redis/v8"
	"github.com/google/uuid"
)

type SlidingWindowLimiter struct {
	client       *redis.Client
	windowSecond float64
	maxOffenses  int64
}

func NewSlidingWindowLimiter(client *redis.Client, windowSec float64, maxOffenses int64) *SlidingWindowLimiter {
	return &SlidingWindowLimiter{
		client:       client,
		windowSecond: windowSec,
		maxOffenses:  maxOffenses,
	}
}

func (s *SlidingWindowLimiter) RecordViolation(ctx context.Context, userID string) bool {
	key := fmt.Sprintf("violation_window:%s", userID)
	now := float64(time.Now().UnixNano()) / 1e9
	cutoff := fmt.Sprintf("%f", now-s.windowSecond)
	member := uuid.New().String()

	pipe := s.client.TxPipeline()
	// Remove old violations
	pipe.ZRemRangeByScore(ctx, key, "0", cutoff)
	// Add current violation
	pipe.ZAdd(ctx, key, &redis.Z{
		Score:  now,
		Member: member,
	})
	// Set expiration
	pipe.Expire(ctx, key, time.Duration(s.windowSecond+60)*time.Second)
	// Count total
	countCmd := pipe.ZCard(ctx, key)

	_, err := pipe.Exec(ctx)
	if err != nil {
		log.Printf("Rate limit error: %v", err)
		return false
	}

	totalOffenses := countCmd.Val()
	log.Printf("User %s violations: %d/%d", userID, totalOffenses, s.maxOffenses)

	return totalOffenses >= s.maxOffenses
}
import { Redis } from "ioredis";
import { v4 as uuidv4 } from "uuid";

export class SlidingWindowLimiter {
  private redis: Redis;
  private windowSeconds: number;
  private maxOffenses: number;

  constructor(redis: Redis, windowSeconds = 10, maxOffenses = 5) {
    this.redis = redis;
    this.windowSeconds = windowSeconds;
    this.maxOffenses = maxOffenses;
  }

  async recordViolation(userId: string): Promise<boolean> {
    const key = `violation_window:${userId}`;
    const now = Date.now() / 1000;
    const cutoff = now - this.windowSeconds;

    try {
      const pipeline = this.redis.pipeline();
      
      pipeline.zremrangebyscore(key, 0, cutoff);
      pipeline.zadd(key, now, uuidv4());
      pipeline.expire(key, Math.ceil(this.windowSeconds + 60));
      pipeline.zcard(key);

      const results = await pipeline.exec();
      // results[3] contains the output of zcard
      // format is [[err, result], [err, result], ...]
      const zcardResult = results?.[3]?.[1] as number;
      
      console.log(`User ${userId} violations: ${zcardResult}/${this.maxOffenses}`);

      return zcardResult >= this.maxOffenses;
    } catch (error) {
      console.error("Rate limit error:", error);
      return false;
    }
  }
}

Example Flow

User sends message → add timestamp to ZSET
Remove timestamps older than 10 seconds
Count remaining messages → if above limit, block or ban

This approach lets the system track message bursts accurately, rather than just relying on fixed counters, giving a more precise control over spam behavior.


Banned Users and the Cuckoo Filter

Once a user repeatedly violates the chat rules by sending abusive messages or spamming they need to be blocked immediately. But with thousands of users and potentially millions of messages, doing this efficiently becomes a challenge. That’s where the Cuckoo Filter comes in.

What is a Cuckoo Filter?

A Cuckoo Filter is a probabilistic data structure similar to a Bloom filter, designed to quickly check membership:

  • Can answer the question: “Has this user been banned?” almost instantly.
  • Neither traditional sets nor databases use as little memory.
  • Supports deletions, which Bloom filters cannot do efficiently.

In short, it’s fast, memory-efficient, and flexible.

Why Not Just Redis or a Database?

  • Redis SET or Postgres could store banned users, but:
    • Checking thousands of users in memory or querying a database every message is slower.
    • Scaling horizontally increases cost, you’d need more memory or compute to hold and query the full banned list.
    • Every read/write to a database adds latency, which we can’t afford in a real-time system.
  • Cuckoo Filter solves this:
    • O(1) lookup → instant ban checks per message
    • Extremely memory-efficient → can handle millions of users without heavy resource use
    • Supports deletions → temporary bans are easy to manage
import logging
from redis.asyncio import Redis
from redis.exceptions import ResponseError

logger = logging.getLogger("uvicorn")

class CuckooFilter:

    def __init__(self, redis_client: Redis, key_name: str = "blocked_users_cuckoo"):
        self.redis = redis_client
        self.key = key_name
        self._has_redis_stack = None  

    async def _check_stack_availability(self):
        if self._has_redis_stack is not None:
            return self._has_redis_stack
        
        try:
            await self.redis.execute_command("CF.EXISTS", self.key, "dummy")
            self._has_redis_stack = True
            logger.info("RedisBloom (Cuckoo Filter) detected.")
        except ResponseError as e:
            if "unknown command" in str(e).lower():
                self._has_redis_stack = False
                logger.warning("RedisBloom not detected. Falling back to standard Redis Set.")
            else:
                raise e
        return self._has_redis_stack

    async def add(self, item: str):
        if await self._check_stack_availability():
            try:
                await self.redis.execute_command("CF.ADD", self.key, item)
            except ResponseError:
                pass
        else:
            await self.redis.sadd(self.key, item)

    async def contains(self, item: str) -> bool:
        if await self._check_stack_availability():
            return bool(await self.redis.execute_command("CF.EXISTS", self.key, item))
        else:
            return await self.redis.sismember(self.key, item)
package bloom

import (
	"context"
	"log"
	"strings"

	"github.com/go-redis/redis/v8"
)

type CuckooFilter struct {
	client        *redis.Client
	key           string
	hasRedisStack *bool
}

func NewCuckooFilter(client *redis.Client, key string) *CuckooFilter {
	return &CuckooFilter{
		client: client,
		key:    key,
	}
}

func (c *CuckooFilter) checkStack(ctx context.Context) bool {
	if c.hasRedisStack != nil {
		return *c.hasRedisStack
	}

	exists := true
	err := c.client.Do(ctx, "CF.EXISTS", c.key, "dummy").Err()
	if err != nil {
		if strings.Contains(strings.ToLower(err.Error()), "unknown command") {
			exists = false
			log.Println("RedisBloom not detected. Falling back to Set.")
		}
	}
	c.hasRedisStack = &exists
	return exists
}

func (c *CuckooFilter) Add(ctx context.Context, item string) error {
	if c.checkStack(ctx) {
		return c.client.Do(ctx, "CF.ADD", c.key, item).Err()
	}
	return c.client.SAdd(ctx, c.key, item).Err()
}

func (c *CuckooFilter) Contains(ctx context.Context, item string) (bool, error) {
	if c.checkStack(ctx) {
		val, err := c.client.Do(ctx, "CF.EXISTS", c.key, item).Int()
		return val == 1, err
	}
	return c.client.SIsMember(ctx, c.key, item).Result()
}
import { Redis } from "ioredis";

export class CuckooFilter {
    private redis: Redis;
    private key: string;
    private hasRedisStack: boolean | null = null;

    constructor(redis: Redis, keyName = "blocked_users_cuckoo") {
        this.redis = redis;
        this.key = keyName;
    }

    private async checkStackAvailability(): Promise<boolean> {
        if (this.hasRedisStack !== null) return this.hasRedisStack;

        try {
            await this.redis.call("CF.EXISTS", this.key, "dummy");
            this.hasRedisStack = true;
        } catch (error: any) {
            if (error.message.toLowerCase().includes("unknown command")) {
                this.hasRedisStack = false;
                console.warn("RedisBloom not detected. Falling back to Set.");
            } else {
                throw error;
            }
        }
        return this.hasRedisStack!;
    }

    async add(item: string): Promise<void> {
        if (await this.checkStackAvailability()) {
            await this.redis.call("CF.ADD", this.key, item);
        } else {
            await this.redis.sadd(this.key, item);
        }
    }

    async contains(item: string): Promise<boolean> {
        if (await this.checkStackAvailability()) {
            return (await this.redis.call("CF.EXISTS", this.key, item)) === 1;
        } else {
            return (await this.redis.sismember(this.key, item)) === 1;
        }
    }
}

How It Fits in the System

  1. User connects via WebSocket
    • First check: Is their user_id in the Cuckoo Filter?
    • If yes → reject connection instantly (no DB call needed)
  2. User is banned during a session
    • Their user_id is added to the Cuckoo Filter immediately
    • Redis can also store a TTL key for temporary bans for persistence
  3. Scaling
    • Since lookups are O(1) and memory is small, you can run many server instances without slowing down
    • Helps reduce cost: no need to store massive sets in memory or hammer the database

Real-world Impact

Using the Cuckoo Filter allows ChatWarden to:

  • Instantly reject banned users before they even connect
  • Handle millions of users efficiently
  • Keep the system fast and low-latency even under heavy load
  • Reduce operational costs compared to storing large sets in memory or relying on frequent DB queries

Message Lifecycle: From User to Chat

Now that we’ve covered the tech and core components, let’s walk through the full journey of a message in ChatWarden — from the moment a user sends it to when it appears in the chat (or gets blocked).

  1. User Sends a Message
    • The user types a message and sends it via WebSocket.
    • The WebSocket server receives the message instantly.
  2. Banned User Check
    • Before doing any processing, the system checks the Cuckoo Filter to see if the user is banned.
    • If the user is blocked, the connection can be rejected immediately — saving compute time and keeping the chat safe.
  3. Moderation Checks
    • Keyword Filter: Quickly scans for known abusive words or phrases.
    • AI Moderator: Runs a trained ML model to detect toxicity, threats, or harassment.
    • If the message is flagged, the user may get a warning, or their action may trigger rate-limiting or a ban.
  4. Rate Limiting / Sliding Window
    • Each user’s recent message timestamps are tracked in a Redis ZSET.
    • If a user exceeds the allowed messages in the sliding window, they are temporarily blocked or banned.
  5. Publishing and Broadcasting
    • Messages that pass moderation are published to a Redis Pub/Sub channel.
    • All FastAPI instances subscribed to that channel receive the message.
    • Each instance iterates through its in-memory list of active WebSocket connections and broadcasts the message to the clients it manages.
  6. Persistence and Logging
    • Temporary bans and rate-limit counters live in Redis for fast checks.
    • Permanent bans, warnings, and moderation events are logged in PostgreSQL for auditing and future review.

This flow ensures that every message is inspected, safe messages reach all users quickly, and abusive users are handled in real time, all while keeping latency low.

Websocket Endpoint Code

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str, db: AsyncSession = Depends(get_db)):
    # 1. Immediate Check against Cuckoo Filter
    if await cuckoo_filter.contains(user_id):
        logger.info(f"Connection Rejected (Cuckoo): {user_id}")
        await websocket.close(code=4003, reason="You are banned.")
        return

    # 2. Check traditional Redis/DB ban
    ban_reason = await redis_client.get(f"banned:{user_id}")
    if ban_reason:
        await websocket.close(code=4003, reason=f"Banned: {ban_reason}")
        return

    await manager.connect(websocket)
    
    try:
        # Send history on connect
        history = await redis_manager.get_recent_messages()
        await websocket.send_text(json.dumps({"type": "history", "data": history}))

        while True:
            data = await websocket.receive_text()
            
            try:
                payload = json.loads(data)
                text = payload.get("text", "")
            except:
                text = data
            
            if not text.strip():
                continue

            # 3. Moderation
            mod_result = moderation_engine.moderate(text)
            
            if mod_result["status"] == "FLAGGED":
                logger.warning(f"Flagged: {user_id} - {text}")
                
                await websocket.send_text(json.dumps({
                    "type": "system", 
                    "status": "BLOCKED", 
                    "reason": mod_result["reason"]
                }))
                
                # 4. Rate Limiting Logic on Violation
                is_rate_limited = await rate_limiter.record_violation(user_id)
                if is_rate_limited:
                    logger.warning(f"BANNING USER: {user_id}")
                    reason = "Excessive Toxic Behavior"
                    
                    await redis_client.setex(f"banned:{user_id}", Config.BAN_DURATION, reason)
                    
                    # Add to Filter for instant future rejection
                    await cuckoo_filter.add(user_id)
                    
                    # Persist to DB
                    new_ban = BlockedUser(
                        user_id=user_id,
                        reason=reason,
                        banned_until=None
                    )
                    db.add(new_ban)
                    await db.commit()
                    
                    await websocket.close(code=4003, reason="Banned for toxicity.")
                    break
            else:
                # 5. Broadcast if Safe
                msg_obj = {
                    "type": "chat",
                    "user_id": user_id,
                    "text": text,
                    "timestamp": datetime.now(timezone.utc).isoformat()
                }
                await redis_manager.publish(msg_obj)

    except WebSocketDisconnect:
        manager.disconnect(websocket)
    except Exception as e:
        logger.error(f"WS Error: {e}")
        manager.disconnect(websocket)

Wrap-Up: Lessons Learned and Takeaways

Building a system like ChatWarden taught me that real-time chat moderation is more than just detecting bad words. It’s a combination of low-latency message delivery, state management, AI moderation, and system-level optimizations.

Key takeaways from building this project:

  • WebSockets + Redis Pub/Sub make multi-instance real-time messaging scalable.
  • Redis ZSETs provide precise, fast rate-limiting for spam control.
  • Cuckoo Filters allow instant banned user checks without consuming huge amounts of memory.
  • Separation of ephemeral and persistent state ensures low latency and reliability.
  • A small, efficient AI model can catch nuanced abusive behavior, complementing keyword filters.

Real-time moderation systems are challenging, but with the right architecture, you can scale, stay responsive, and keep your community safe — all with open-source tools.