Documentation Index Fetch the complete documentation index at: https://robintail-express-zod-api-69.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Express Zod API supports real-time data streaming using Server-Sent Events (SSE). This lightweight alternative to WebSockets enables your server to push updates to clients over a standard HTTP connection.
For bidirectional communication, consider Zod Sockets , a companion library for WebSocket support.
What are Server-Sent Events?
Server-Sent Events provide:
Unidirectional server-to-client communication
Automatic reconnection
Event-based message delivery
Built-in browser support via EventSource
Lower overhead than WebSockets for one-way streaming
Creating an Event Stream
Use EventStreamFactory to create streaming endpoints:
import { z } from "zod" ;
import { EventStreamFactory } from "express-zod-api" ;
import { setTimeout } from "node:timers/promises" ;
// Define your events schema
const eventsFactory = new EventStreamFactory ({
time: z . number (). int (). positive (),
});
// Build the streaming endpoint
const subscriptionEndpoint = eventsFactory . buildVoid ({
input: z . object ({
interval: z . number (). int (). positive (). default ( 1000 ),
}),
handler : async ({ input , ctx : { emit , isClosed , signal } }) => {
while ( ! isClosed ()) {
// Emit events to the stream
emit ( "time" , Date . now ());
// Wait before next event
await setTimeout ( input . interval );
}
},
});
Multiple Event Types
Define multiple event types in a single stream:
import { z } from "zod" ;
import { EventStreamFactory } from "express-zod-api" ;
const eventsFactory = new EventStreamFactory ({
// User events
userJoined: z . object ({
userId: z . string (),
username: z . string (),
timestamp: z . number (),
}),
userLeft: z . object ({
userId: z . string (),
timestamp: z . number (),
}),
// Message events
message: z . object ({
id: z . string (),
userId: z . string (),
text: z . string (),
timestamp: z . number (),
}),
// Status events
status: z . object ({
activeUsers: z . number (),
serverLoad: z . number (),
}),
});
const chatStreamEndpoint = eventsFactory . buildVoid ({
input: z . object ({
roomId: z . string (),
}),
handler : async ({ input , ctx : { emit , isClosed , signal }, logger }) => {
const room = await joinRoom ( input . roomId );
// Emit initial status
emit ( "status" , {
activeUsers: room . users . length ,
serverLoad: 0.5 ,
});
// Listen for room events
room . on ( "userJoined" , ( user ) => {
if ( ! isClosed ()) {
emit ( "userJoined" , {
userId: user . id ,
username: user . name ,
timestamp: Date . now (),
});
}
});
room . on ( "message" , ( msg ) => {
if ( ! isClosed ()) {
emit ( "message" , msg );
}
});
// Wait until connection closes
signal . addEventListener ( "abort" , () => {
logger . info ( "Client disconnected" );
room . leave ();
});
},
});
Client-Side Consumption
Consume SSE streams using the native EventSource API:
// Vanilla JavaScript
const source = new EventSource (
"https://api.example.com/v1/events/stream?interval=1000"
);
source . addEventListener ( "time" , ( event ) => {
const timestamp = JSON . parse ( event . data );
console . log ( "Server time:" , new Date ( timestamp ));
});
source . addEventListener ( "error" , ( error ) => {
console . error ( "Connection error:" , error );
source . close ();
});
// Close when done
// source.close();
React Example
import { useEffect , useState } from "react" ;
function TimeDisplay () {
const [ time , setTime ] = useState < number | null >( null );
const [ error , setError ] = useState < string | null >( null );
useEffect (() => {
const source = new EventSource ( "/api/v1/events/time" );
source . addEventListener ( "time" , ( event ) => {
setTime ( JSON . parse ( event . data ));
});
source . addEventListener ( "error" , () => {
setError ( "Connection lost" );
source . close ();
});
return () => {
source . close ();
};
}, []);
if ( error ) return < div > Error: { error } </ div > ;
if ( ! time ) return < div > Connecting... </ div > ;
return < div > Server time: {new Date ( time ). toISOString () } </ div > ;
}
Connection Lifecycle
Manage the connection lifecycle with provided helpers:
isClosed()
Check if the client has disconnected:
const endpoint = eventsFactory . buildVoid ({
input: z . object ({}),
handler : async ({ ctx : { emit , isClosed } }) => {
let counter = 0 ;
while ( ! isClosed ()) {
emit ( "count" , counter ++ );
await setTimeout ( 1000 );
}
// Cleanup after client disconnects
console . log ( "Stream closed, sent" , counter , "events" );
},
});
signal (AbortSignal)
Use the abort signal for cleanup and cancellation:
const endpoint = eventsFactory . buildVoid ({
input: z . object ({ topic: z . string () }),
handler : async ({ input , ctx : { emit , signal }, logger }) => {
const subscription = messageBus . subscribe ( input . topic );
// Listen for disconnect
signal . addEventListener ( "abort" , () => {
logger . info ( "Client disconnected, cleaning up" );
subscription . unsubscribe ();
});
// Forward messages to stream
for await ( const message of subscription ) {
if ( signal . aborted ) break ;
emit ( "message" , message );
}
},
});
With Middleware
Add authentication and other middlewares to event streams:
import { z } from "zod" ;
import { EventStreamFactory } from "express-zod-api" ;
import { authMiddleware } from "./middlewares" ;
const authenticatedEventsFactory = new EventStreamFactory ({
notification: z . object ({
id: z . string (),
type: z . enum ([ "info" , "warning" , "error" ]),
message: z . string (),
timestamp: z . number (),
}),
})
. addMiddleware ( authMiddleware );
const notificationsEndpoint = authenticatedEventsFactory . buildVoid ({
input: z . object ({}),
handler : async ({ ctx : { user , emit , isClosed , signal } }) => {
// user is available from authMiddleware
const stream = await subscribeToUserNotifications ( user . id );
signal . addEventListener ( "abort" , () => {
stream . unsubscribe ();
});
for await ( const notification of stream ) {
if ( signal . aborted ) break ;
emit ( "notification" , notification );
}
},
});
Real-World Examples
Live Metrics Dashboard
import { EventStreamFactory } from "express-zod-api" ;
import { z } from "zod" ;
import { setTimeout } from "node:timers/promises" ;
const metricsFactory = new EventStreamFactory ({
metrics: z . object ({
cpu: z . number (). min ( 0 ). max ( 100 ),
memory: z . number (). min ( 0 ). max ( 100 ),
requests: z . number (). int (). nonnegative (),
errors: z . number (). int (). nonnegative (),
timestamp: z . number (),
}),
});
const metricsEndpoint = metricsFactory . buildVoid ({
input: z . object ({
interval: z . number (). int (). min ( 1000 ). default ( 5000 ),
}),
handler : async ({ input , ctx : { emit , isClosed } }) => {
while ( ! isClosed ()) {
const metrics = await collectSystemMetrics ();
emit ( "metrics" , {
... metrics ,
timestamp: Date . now (),
});
await setTimeout ( input . interval );
}
},
});
Progress Tracking
const progressFactory = new EventStreamFactory ({
progress: z . object ({
taskId: z . string (),
percentage: z . number (). min ( 0 ). max ( 100 ),
status: z . enum ([ "pending" , "processing" , "completed" , "failed" ]),
message: z . string (),
}),
});
const taskProgressEndpoint = progressFactory . buildVoid ({
input: z . object ({
taskId: z . string (). uuid (),
}),
handler : async ({ input , ctx : { emit , isClosed , signal } }) => {
const task = await Task . findById ( input . taskId );
task . on ( "progress" , ( progress ) => {
if ( ! isClosed ()) {
emit ( "progress" , {
taskId: input . taskId ,
percentage: progress . percentage ,
status: progress . status ,
message: progress . message ,
});
}
});
// Wait for completion or disconnect
await Promise . race ([
task . waitForCompletion (),
new Promise (( resolve ) => {
signal . addEventListener ( "abort" , resolve );
}),
]);
task . removeAllListeners ();
},
});
Log Streaming
import { tail } from "tail" ;
const logsFactory = new EventStreamFactory ({
log: z . object ({
level: z . enum ([ "info" , "warn" , "error" , "debug" ]),
message: z . string (),
timestamp: z . string (),
source: z . string (),
}),
});
const logStreamEndpoint = logsFactory . buildVoid ({
input: z . object ({
service: z . string (),
level: z . enum ([ "info" , "warn" , "error" , "debug" ]). optional (),
}),
handler : async ({ input , ctx : { emit , signal }, logger }) => {
const logFile = `/var/log/ ${ input . service } .log` ;
const tailer = new tail ( logFile );
tailer . on ( "line" , ( line ) => {
const log = parseLogLine ( line );
if ( ! input . level || log . level === input . level ) {
emit ( "log" , log );
}
});
signal . addEventListener ( "abort" , () => {
logger . info ( "Stopping log stream" );
tailer . unwatch ();
});
},
});
Error Handling
Handle errors in event streams:
const endpoint = eventsFactory . buildVoid ({
input: z . object ({ trigger: z . string (). optional () }),
handler : async ({ input , ctx : { emit , isClosed } }) => {
// Trigger error for testing
if ( input . trigger === "failure" ) {
throw new Error ( "Intentional failure" );
}
try {
while ( ! isClosed ()) {
const data = await fetchData ();
emit ( "data" , data );
await setTimeout ( 1000 );
}
} catch ( error ) {
// Errors terminate the stream and send error response
throw createHttpError ( 500 , "Data fetch failed" );
}
},
});
Routing
Add SSE endpoints to your routing:
import { Routing } from "express-zod-api" ;
const routing : Routing = {
v1: {
events: {
time: subscriptionEndpoint ,
metrics: metricsEndpoint ,
notifications: notificationsEndpoint ,
},
},
};
Generated Client Support
The Integration generator creates a Subscription class for type-safe SSE consumption:
import { Integration } from "express-zod-api" ;
const client = new Integration ({
routing ,
config ,
variant: "client" ,
});
const code = await client . printFormatted ();
Generated client usage:
import { Subscription } from "./generated-client" ;
const subscription = new Subscription ( "get /v1/events/time" , {
interval: 1000 ,
});
subscription . on ( "time" , ( timestamp ) => {
console . log ( "Time:" , new Date ( timestamp ));
});
// Access the underlying EventSource
subscription . source . addEventListener ( "error" , ( err ) => {
console . error ( "Connection error" , err );
});
// Close the connection
subscription . source . close ();
Best Practices
Always check isClosed() before emitting events to avoid errors when the client has disconnected.
Register cleanup logic with the signal to release resources when clients disconnect.
Define strict Zod schemas for your events—the framework validates all emitted data automatically.
Don’t emit events faster than clients can consume them. Use appropriate intervals or queue mechanisms.
Consider implementing heartbeat/ping events to detect stale connections.
Test disconnect scenarios
Test how your handlers behave when clients disconnect unexpectedly.
Limitations
SSE is one-way (server to client only)
Limited to text-based data (JSON strings)
Some proxies and firewalls may interfere
Maximum of 6 concurrent connections per domain in browsers
No binary data support (use base64 encoding if needed)
Comparison with WebSockets
Feature SSE WebSockets Direction Server → Client Bidirectional Protocol HTTP WS/WSS Reconnection Automatic Manual Browser Support Good Excellent Overhead Lower Higher Binary Data No Yes Use Case Live feeds, notifications Chat, gaming, collaboration
Next Steps