Infrastructure

Market Data Infrastructure: WebSocket Patterns That Scale

Deep dive into WebSocket reliability, sequence gap detection, Kubernetes patterns, and monitoring for multi-exchange market data systems.

6 min
#trading #infrastructure #websocket #kubernetes #monitoring #market-data

Exchanges send orderbook updates at microsecond intervals. If your infrastructure can’t keep up, your trading system is flying blind.

I’ve built market data infrastructure handling 100K+ messages/second across 12+ exchanges. The difference between “it works” and “production-ready” is infrastructure reliability-not algorithm cleverness.

This post covers the infrastructure patterns: WebSocket reliability, Kubernetes deployment, monitoring, and recovery.

The Problem {#the-problem}

Market data infrastructure challenges:

ChallengeImpact
WebSocket disconnectSilent, stale data
Sequence gapsWrong orderbook state
Rate limitsForced reconnection
Multi-exchange12+ connections, each different
High message rates10K-100K msgs/sec peak

The mistake: Teams focus on parsing speed while WebSocket connections silently die. Your 1µs parser is useless if the data is 10 seconds stale.

For kernel-level optimization, see Network Deep Dive. For monitoring patterns, see Trading Metrics.

WebSocket Reliability Patterns {#websocket}

The Core Problem

WebSockets silently disconnect. TCP keepalive isn’t reliable for application-level health. Exchanges can:

  • Rate-limit you
  • Drop connections without FIN
  • Send stale data during issues

Pattern 1: Heartbeat Monitoring

import asyncio
import websockets
import time
from prometheus_client import Gauge, Counter

WS_LATENCY = Gauge('md_heartbeat_latency_ms', 'WebSocket heartbeat latency', ['exchange'])
LAST_MESSAGE = Gauge('md_last_message_timestamp', 'Last message Unix timestamp', ['exchange'])
RECONNECTS = Counter('md_reconnects_total', 'WebSocket reconnections', ['exchange'])

class ResilientWebSocket:
    def __init__(self, url: str, exchange: str, heartbeat_interval: float = 30):
        self.url = url
        self.exchange = exchange
        self.heartbeat_interval = heartbeat_interval
        self.ws = None
        self.last_message_time = 0
    
    async def connect(self):
        while True:
            try:
                async with websockets.connect(
                    self.url,
                    ping_interval=20,
                    ping_timeout=10,
                    close_timeout=5
                ) as ws:
                    self.ws = ws
                    await self._run_loop()
            except Exception as e:
                RECONNECTS.labels(exchange=self.exchange).inc()
                await asyncio.sleep(self._backoff())
    
    async def _run_loop(self):
        heartbeat_task = asyncio.create_task(self._heartbeat_loop())
        staleness_task = asyncio.create_task(self._staleness_check())
        
        try:
            async for message in self.ws:
                self.last_message_time = time.time()
                LAST_MESSAGE.labels(exchange=self.exchange).set(self.last_message_time)
                await self._process_message(message)
        finally:
            heartbeat_task.cancel()
            staleness_task.cancel()
    
    async def _heartbeat_loop(self):
        while True:
            await asyncio.sleep(self.heartbeat_interval)
            start = time.time()
            try:
                pong = await self.ws.ping()
                await asyncio.wait_for(pong, timeout=10)
                latency = (time.time() - start) * 1000
                WS_LATENCY.labels(exchange=self.exchange).set(latency)
            except:
                raise Exception("Heartbeat failed")
    
    async def _staleness_check(self):
        while True:
            await asyncio.sleep(5)
            if time.time() - self.last_message_time > 10:
                raise Exception("Stale connection")
    
    def _backoff(self):
        # Exponential backoff: 1s, 2s, 4s, 8s, max 30s
        return min(30, 2 ** self._reconnect_attempts)

Pattern 2: Connection Pool

Multiple connections for redundancy:

class ConnectionPool:
    def __init__(self, url: str, exchange: str, pool_size: int = 2):
        self.connections = [
            ResilientWebSocket(url, exchange, f"{exchange}-{i}")
            for i in range(pool_size)
        ]
        self.active = 0
    
    async def start(self):
        tasks = [conn.connect() for conn in self.connections]
        await asyncio.gather(*tasks)
    
    def get_best_message(self, messages: list):
        # Use message with highest sequence number
        return max(messages, key=lambda m: m.get('sequence', 0))

Kubernetes Deployment {#kubernetes}

Fault Isolation: Per-Exchange Deployments

# binance-connector.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: md-binance
  namespace: market-data
spec:
  replicas: 2  # Hot standby
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 0  # Zero downtime
      maxSurge: 1
  selector:
    matchLabels:
      app: market-data
      exchange: binance
  template:
    metadata:
      labels:
        app: market-data
        exchange: binance
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9090"
    spec:
      terminationGracePeriodSeconds: 30
      containers:
      - name: connector
        image: market-data-connector:v1.2.3
        env:
        - name: EXCHANGE
          value: "binance"
        - name: SYMBOLS
          value: "BTCUSDT,ETHUSDT,SOLUSDT"
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"
          limits:
            cpu: "1000m"
            memory: "1Gi"
        ports:
        - containerPort: 9090
          name: metrics
        readinessProbe:
          httpGet:
            path: /ready
            port: 9090
          periodSeconds: 2
          failureThreshold: 3
        livenessProbe:
          httpGet:
            path: /health
            port: 9090
          initialDelaySeconds: 30
          periodSeconds: 10

Terraform for Multi-Exchange

locals {
  exchanges = {
    binance  = { ws = "wss://stream.binance.com:9443", symbols = ["BTCUSDT", "ETHUSDT"] }
    coinbase = { ws = "wss://ws-feed.exchange.coinbase.com", symbols = ["BTC-USD", "ETH-USD"] }
    kraken   = { ws = "wss://ws.kraken.com", symbols = ["XBT/USD", "ETH/USD"] }
  }
}

resource "kubernetes_deployment" "market_data" {
  for_each = local.exchanges
  
  metadata {
    name      = "md-${each.key}"
    namespace = "market-data"
  }
  
  spec {
    replicas = 2
    
    selector {
      match_labels = {
        app      = "market-data"
        exchange = each.key
      }
    }
    
    template {
      metadata {
        labels = {
          app      = "market-data"
          exchange = each.key
        }
      }
      
      spec {
        container {
          name  = "connector"
          image = "market-data-connector:v1.2.3"
          
          env {
            name  = "EXCHANGE"
            value = each.key
          }
          
          env {
            name  = "WS_ENDPOINT"
            value = each.value.ws
          }
          
          env {
            name  = "SYMBOLS"
            value = join(",", each.value.symbols)
          }
        }
      }
    }
  }
}

For StatefulSet patterns (when needed), see Kubernetes for Trading.

Sequence Gap Detection {#sequence}

The Problem

Missing a delta update means wrong orderbook state. You buy based on a stale price.

Implementation

from prometheus_client import Counter

SEQUENCE_GAPS = Counter('md_sequence_gaps_total', 'Sequence gap events', ['exchange', 'symbol'])

class OrderbookManager:
    def __init__(self, exchange: str, symbol: str):
        self.exchange = exchange
        self.symbol = symbol
        self.expected_sequence = 0
        self.book = {'bids': {}, 'asks': {}}
        self.recovering = False
    
    async def handle_message(self, msg: dict):
        seq = msg.get('sequence', msg.get('u', msg.get('lastUpdateId')))
        
        if self.expected_sequence == 0:
            # First message, just store
            self.expected_sequence = seq
            return await self._apply_snapshot(msg)
        
        if seq != self.expected_sequence + 1:
            # Gap detected
            SEQUENCE_GAPS.labels(
                exchange=self.exchange,
                symbol=self.symbol
            ).inc()
            await self._resync()
            return
        
        await self._apply_delta(msg)
        self.expected_sequence = seq
    
    async def _resync(self):
        if self.recovering:
            return
        
        self.recovering = True
        try:
            snapshot = await self.client.get_depth(self.symbol)
            self.book = self._parse_snapshot(snapshot)
            self.expected_sequence = snapshot['lastUpdateId']
        finally:
            self.recovering = False
    
    def _apply_delta(self, delta: dict):
        for price, qty in delta.get('bids', []):
            if qty == 0:
                self.book['bids'].pop(price, None)
            else:
                self.book['bids'][price] = qty
        # Same for asks...

Recovery Strategies

StrategyWhen to Use
Full snapshotSingle gap, quick recovery
ReconnectPersistent gaps
Switch exchangeExchange issues

Monitoring {#monitoring}

Essential Metrics

from prometheus_client import Histogram, Gauge, Counter

# Message processing latency
MSG_LATENCY = Histogram(
    'md_message_latency_seconds',
    'Message processing time',
    ['exchange'],
    buckets=[0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05]
)

# Message rate
MSG_RATE = Counter(
    'md_messages_total',
    'Total messages processed',
    ['exchange', 'type']
)

# Connection state
CONNECTION_STATE = Gauge(
    'md_connection_state',
    'WebSocket state (1=connected, 0=disconnected)',
    ['exchange']
)

# Orderbook depth
BOOK_DEPTH = Gauge(
    'md_orderbook_depth',
    'Number of price levels',
    ['exchange', 'symbol', 'side']
)

Grafana Dashboard

{
  "title": "Market Data Infrastructure",
  "panels": [
    {
      "title": "Message Rate by Exchange",
      "type": "graph",
      "targets": [{
        "expr": "rate(md_messages_total[1m])"
      }]
    },
    {
      "title": "Sequence Gaps (1h)",
      "type": "stat",
      "targets": [{
        "expr": "increase(md_sequence_gaps_total[1h])"
      }]
    },
    {
      "title": "Connection State",
      "type": "table",
      "targets": [{
        "expr": "md_connection_state"
      }]
    },
    {
      "title": "Message Latency P99",
      "type": "stat",
      "targets": [{
        "expr": "histogram_quantile(0.99, rate(md_message_latency_seconds_bucket[5m]))"
      }]
    }
  ]
}

For complete monitoring patterns, see Trading Metrics.

AWS Network Optimization {#aws}

Placement for Low Latency

resource "aws_placement_group" "market_data" {
  name     = "market-data-cluster"
  strategy = "cluster"
}

resource "aws_eks_node_group" "market_data" {
  cluster_name    = aws_eks_cluster.main.name
  node_group_name = "market-data"
  
  instance_types = ["c6in.xlarge"]  # Network-optimized
  
  scaling_config {
    desired_size = 3
    min_size     = 2
    max_size     = 5
  }
  
  # Use placement group
  launch_template {
    id      = aws_launch_template.market_data.id
    version = "$Latest"
  }
}

resource "aws_launch_template" "market_data" {
  name_prefix   = "market-data-"
  instance_type = "c6in.xlarge"
  
  placement {
    group_name = aws_placement_group.market_data.name
  }
}

For kernel-level network tuning, see Network Deep Dive.

Design Philosophy {#design-philosophy}

Reliability Over Speed

Market data parsing can be sub-microsecond. Doesn’t matter if:

  • WebSocket disconnected 10 seconds ago
  • Sequence gaps corrupted your orderbook
  • Rate limits prevented reconnection

Priority order:

  1. Reliability: Data is current and correct
  2. Completeness: No gaps in sequence
  3. Speed: Fast processing

Fault Isolation

Each exchange is different:

  • Different rate limits
  • Different message formats
  • Different failure modes

Design principle: Binance down shouldn’t affect Coinbase. Separate deployments, separate failure domains.


Audit Your Infrastructure

Sub-millisecond orderbook reconstruction requires a properly tuned kernel. Run latency-audit to check your Linux settings before optimizing your matching engine.

pip install latency-audit && latency-audit

Reading Path

Continue exploring with these related deep dives:

TopicNext Post
NIC offloads, IRQ affinity, kernel bypassNetwork Optimization: Kernel Bypass and the Art of Busy Polling
THP, huge pages, memory locking, pre-allocationMemory Tuning for Low-Latency: The THP Trap and HugePage Mastery
Measuring without overhead using eBPFeBPF Profiling: Nanoseconds Without Adding Any
Design philosophy & architecture decisionsTrading Infrastructure: First Principles That Scale
SLOs, metrics that matter, alertingTrading Metrics: What SRE Dashboards Miss
Share: LinkedIn X