Skip to main content

Overview

Express Zod API supports real-time data streaming using Server-Sent Events (SSE). This lightweight alternative to WebSockets enables your server to push updates to clients over a standard HTTP connection.
For bidirectional communication, consider Zod Sockets, a companion library for WebSocket support.

What are Server-Sent Events?

Server-Sent Events provide:
  • Unidirectional server-to-client communication
  • Automatic reconnection
  • Event-based message delivery
  • Built-in browser support via EventSource
  • Lower overhead than WebSockets for one-way streaming

Creating an Event Stream

Use EventStreamFactory to create streaming endpoints:
import { z } from "zod";
import { EventStreamFactory } from "express-zod-api";
import { setTimeout } from "node:timers/promises";

// Define your events schema
const eventsFactory = new EventStreamFactory({
  time: z.number().int().positive(),
});

// Build the streaming endpoint
const subscriptionEndpoint = eventsFactory.buildVoid({
  input: z.object({
    interval: z.number().int().positive().default(1000),
  }),
  handler: async ({ input, ctx: { emit, isClosed, signal } }) => {
    while (!isClosed()) {
      // Emit events to the stream
      emit("time", Date.now());
      
      // Wait before next event
      await setTimeout(input.interval);
    }
  },
});

Multiple Event Types

Define multiple event types in a single stream:
import { z } from "zod";
import { EventStreamFactory } from "express-zod-api";

const eventsFactory = new EventStreamFactory({
  // User events
  userJoined: z.object({
    userId: z.string(),
    username: z.string(),
    timestamp: z.number(),
  }),
  userLeft: z.object({
    userId: z.string(),
    timestamp: z.number(),
  }),
  
  // Message events
  message: z.object({
    id: z.string(),
    userId: z.string(),
    text: z.string(),
    timestamp: z.number(),
  }),
  
  // Status events
  status: z.object({
    activeUsers: z.number(),
    serverLoad: z.number(),
  }),
});

const chatStreamEndpoint = eventsFactory.buildVoid({
  input: z.object({
    roomId: z.string(),
  }),
  handler: async ({ input, ctx: { emit, isClosed, signal }, logger }) => {
    const room = await joinRoom(input.roomId);
    
    // Emit initial status
    emit("status", {
      activeUsers: room.users.length,
      serverLoad: 0.5,
    });
    
    // Listen for room events
    room.on("userJoined", (user) => {
      if (!isClosed()) {
        emit("userJoined", {
          userId: user.id,
          username: user.name,
          timestamp: Date.now(),
        });
      }
    });
    
    room.on("message", (msg) => {
      if (!isClosed()) {
        emit("message", msg);
      }
    });
    
    // Wait until connection closes
    signal.addEventListener("abort", () => {
      logger.info("Client disconnected");
      room.leave();
    });
  },
});

Client-Side Consumption

Consume SSE streams using the native EventSource API:
// Vanilla JavaScript
const source = new EventSource(
  "https://api.example.com/v1/events/stream?interval=1000"
);

source.addEventListener("time", (event) => {
  const timestamp = JSON.parse(event.data);
  console.log("Server time:", new Date(timestamp));
});

source.addEventListener("error", (error) => {
  console.error("Connection error:", error);
  source.close();
});

// Close when done
// source.close();

React Example

import { useEffect, useState } from "react";

function TimeDisplay() {
  const [time, setTime] = useState<number | null>(null);
  const [error, setError] = useState<string | null>(null);

  useEffect(() => {
    const source = new EventSource("/api/v1/events/time");

    source.addEventListener("time", (event) => {
      setTime(JSON.parse(event.data));
    });

    source.addEventListener("error", () => {
      setError("Connection lost");
      source.close();
    });

    return () => {
      source.close();
    };
  }, []);

  if (error) return <div>Error: {error}</div>;
  if (!time) return <div>Connecting...</div>;
  
  return <div>Server time: {new Date(time).toISOString()}</div>;
}

Connection Lifecycle

Manage the connection lifecycle with provided helpers:

isClosed()

Check if the client has disconnected:
const endpoint = eventsFactory.buildVoid({
  input: z.object({}),
  handler: async ({ ctx: { emit, isClosed } }) => {
    let counter = 0;
    
    while (!isClosed()) {
      emit("count", counter++);
      await setTimeout(1000);
    }
    
    // Cleanup after client disconnects
    console.log("Stream closed, sent", counter, "events");
  },
});

signal (AbortSignal)

Use the abort signal for cleanup and cancellation:
const endpoint = eventsFactory.buildVoid({
  input: z.object({ topic: z.string() }),
  handler: async ({ input, ctx: { emit, signal }, logger }) => {
    const subscription = messageBus.subscribe(input.topic);
    
    // Listen for disconnect
    signal.addEventListener("abort", () => {
      logger.info("Client disconnected, cleaning up");
      subscription.unsubscribe();
    });
    
    // Forward messages to stream
    for await (const message of subscription) {
      if (signal.aborted) break;
      emit("message", message);
    }
  },
});

With Middleware

Add authentication and other middlewares to event streams:
import { z } from "zod";
import { EventStreamFactory } from "express-zod-api";
import { authMiddleware } from "./middlewares";

const authenticatedEventsFactory = new EventStreamFactory({
  notification: z.object({
    id: z.string(),
    type: z.enum(["info", "warning", "error"]),
    message: z.string(),
    timestamp: z.number(),
  }),
})
  .addMiddleware(authMiddleware);

const notificationsEndpoint = authenticatedEventsFactory.buildVoid({
  input: z.object({}),
  handler: async ({ ctx: { user, emit, isClosed, signal } }) => {
    // user is available from authMiddleware
    const stream = await subscribeToUserNotifications(user.id);
    
    signal.addEventListener("abort", () => {
      stream.unsubscribe();
    });
    
    for await (const notification of stream) {
      if (signal.aborted) break;
      emit("notification", notification);
    }
  },
});

Real-World Examples

Live Metrics Dashboard

import { EventStreamFactory } from "express-zod-api";
import { z } from "zod";
import { setTimeout } from "node:timers/promises";

const metricsFactory = new EventStreamFactory({
  metrics: z.object({
    cpu: z.number().min(0).max(100),
    memory: z.number().min(0).max(100),
    requests: z.number().int().nonnegative(),
    errors: z.number().int().nonnegative(),
    timestamp: z.number(),
  }),
});

const metricsEndpoint = metricsFactory.buildVoid({
  input: z.object({
    interval: z.number().int().min(1000).default(5000),
  }),
  handler: async ({ input, ctx: { emit, isClosed } }) => {
    while (!isClosed()) {
      const metrics = await collectSystemMetrics();
      
      emit("metrics", {
        ...metrics,
        timestamp: Date.now(),
      });
      
      await setTimeout(input.interval);
    }
  },
});

Progress Tracking

const progressFactory = new EventStreamFactory({
  progress: z.object({
    taskId: z.string(),
    percentage: z.number().min(0).max(100),
    status: z.enum(["pending", "processing", "completed", "failed"]),
    message: z.string(),
  }),
});

const taskProgressEndpoint = progressFactory.buildVoid({
  input: z.object({
    taskId: z.string().uuid(),
  }),
  handler: async ({ input, ctx: { emit, isClosed, signal } }) => {
    const task = await Task.findById(input.taskId);
    
    task.on("progress", (progress) => {
      if (!isClosed()) {
        emit("progress", {
          taskId: input.taskId,
          percentage: progress.percentage,
          status: progress.status,
          message: progress.message,
        });
      }
    });
    
    // Wait for completion or disconnect
    await Promise.race([
      task.waitForCompletion(),
      new Promise((resolve) => {
        signal.addEventListener("abort", resolve);
      }),
    ]);
    
    task.removeAllListeners();
  },
});

Log Streaming

import { tail } from "tail";

const logsFactory = new EventStreamFactory({
  log: z.object({
    level: z.enum(["info", "warn", "error", "debug"]),
    message: z.string(),
    timestamp: z.string(),
    source: z.string(),
  }),
});

const logStreamEndpoint = logsFactory.buildVoid({
  input: z.object({
    service: z.string(),
    level: z.enum(["info", "warn", "error", "debug"]).optional(),
  }),
  handler: async ({ input, ctx: { emit, signal }, logger }) => {
    const logFile = `/var/log/${input.service}.log`;
    const tailer = new tail(logFile);
    
    tailer.on("line", (line) => {
      const log = parseLogLine(line);
      if (!input.level || log.level === input.level) {
        emit("log", log);
      }
    });
    
    signal.addEventListener("abort", () => {
      logger.info("Stopping log stream");
      tailer.unwatch();
    });
  },
});

Error Handling

Handle errors in event streams:
const endpoint = eventsFactory.buildVoid({
  input: z.object({ trigger: z.string().optional() }),
  handler: async ({ input, ctx: { emit, isClosed } }) => {
    // Trigger error for testing
    if (input.trigger === "failure") {
      throw new Error("Intentional failure");
    }
    
    try {
      while (!isClosed()) {
        const data = await fetchData();
        emit("data", data);
        await setTimeout(1000);
      }
    } catch (error) {
      // Errors terminate the stream and send error response
      throw createHttpError(500, "Data fetch failed");
    }
  },
});

Routing

Add SSE endpoints to your routing:
import { Routing } from "express-zod-api";

const routing: Routing = {
  v1: {
    events: {
      time: subscriptionEndpoint,
      metrics: metricsEndpoint,
      notifications: notificationsEndpoint,
    },
  },
};

Generated Client Support

The Integration generator creates a Subscription class for type-safe SSE consumption:
import { Integration } from "express-zod-api";

const client = new Integration({
  routing,
  config,
  variant: "client",
});

const code = await client.printFormatted();
Generated client usage:
import { Subscription } from "./generated-client";

const subscription = new Subscription("get /v1/events/time", {
  interval: 1000,
});

subscription.on("time", (timestamp) => {
  console.log("Time:", new Date(timestamp));
});

// Access the underlying EventSource
subscription.source.addEventListener("error", (err) => {
  console.error("Connection error", err);
});

// Close the connection
subscription.source.close();

Best Practices

Always check isClosed() before emitting events to avoid errors when the client has disconnected.
Register cleanup logic with the signal to release resources when clients disconnect.
Define strict Zod schemas for your events—the framework validates all emitted data automatically.
Don’t emit events faster than clients can consume them. Use appropriate intervals or queue mechanisms.
Consider implementing heartbeat/ping events to detect stale connections.
Test how your handlers behave when clients disconnect unexpectedly.

Limitations

  • SSE is one-way (server to client only)
  • Limited to text-based data (JSON strings)
  • Some proxies and firewalls may interfere
  • Maximum of 6 concurrent connections per domain in browsers
  • No binary data support (use base64 encoding if needed)

Comparison with WebSockets

FeatureSSEWebSockets
DirectionServer → ClientBidirectional
ProtocolHTTPWS/WSS
ReconnectionAutomaticManual
Browser SupportGoodExcellent
OverheadLowerHigher
Binary DataNoYes
Use CaseLive feeds, notificationsChat, gaming, collaboration

Next Steps