Skip to main content

Overview

The sse middleware enables Server-Sent Events (SSE) for real-time server-to-client communication. Unlike WebSocket, SSE is unidirectional and uses standard HTTP.

Installation

import "github.com/go-mizu/mizu/middlewares/sse"

Quick Start

app := mizu.New()

app.Get("/events", sse.New(func(c *mizu.Ctx, client *sse.Client) {
    // Send events to client
    client.SendData("Hello!")

    // Keep connection open
    <-client.Done
}))

Configuration

OptionTypeDefaultDescription
BufferSizeint10Event channel buffer size
Retryint3000Reconnection time in ms

Examples

Basic Usage

app.Get("/events", sse.New(func(c *mizu.Ctx, client *sse.Client) {
    // Send simple data events
    client.SendData("Event 1")
    client.SendData("Event 2")

    // Wait for client to disconnect
    <-client.Done
}))

Periodic Updates

app.Get("/time", sse.New(func(c *mizu.Ctx, client *sse.Client) {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case t := <-ticker.C:
            client.SendData(t.Format(time.RFC3339))
        case <-client.Done:
            return
        }
    }
}))

Named Events

app.Get("/notifications", sse.New(func(c *mizu.Ctx, client *sse.Client) {
    // Send different event types
    client.SendEvent("message", "New message received")
    client.SendEvent("alert", "System alert!")
    client.SendEvent("update", `{"version": "2.0"}`)

    <-client.Done
}))

Full Event Control

app.Get("/events", sse.New(func(c *mizu.Ctx, client *sse.Client) {
    // Send event with all fields
    client.Send(&sse.Event{
        ID:    "1",
        Event: "notification",
        Data:  "You have a new message",
        Retry: 5000,
    })

    // Send with ID for resumption
    client.Send(&sse.Event{
        ID:   "2",
        Data: "Important update",
    })

    <-client.Done
}))

Using Broker for Broadcasting

// Create a broker for managing multiple clients
broker := sse.NewBroker()

app.Get("/stream", sse.New(func(c *mizu.Ctx, client *sse.Client) {
    // Register client with broker
    broker.Register(client)

    // Wait for disconnect
    <-client.Done
}))

// Broadcast to all clients from anywhere
app.Post("/broadcast", func(c *mizu.Ctx) error {
    message := c.FormValue("message")
    broker.BroadcastData(message)
    return c.Text(200, "Sent to "+strconv.Itoa(broker.ClientCount())+" clients")
})

Broadcast Named Events

broker := sse.NewBroker()

// Broadcast different event types
broker.BroadcastEvent("chat", `{"user": "alice", "message": "Hello!"}`)
broker.BroadcastEvent("typing", `{"user": "bob"}`)
broker.BroadcastEvent("presence", `{"user": "carol", "status": "online"}`)

Custom Options

app.Get("/events", sse.WithOptions(
    func(c *mizu.Ctx, client *sse.Client) {
        // Handler
    },
    sse.Options{
        BufferSize: 50,  // Larger buffer
        Retry:      5000, // 5 second retry
    },
))

Resume from Last Event ID

app.Get("/events", sse.New(func(c *mizu.Ctx, client *sse.Client) {
    // Check if client is resuming
    lastID := client.ID
    if lastID != "" {
        // Send missed events since lastID
        missedEvents := getEventsSince(lastID)
        for _, event := range missedEvents {
            client.Send(event)
        }
    }

    // Continue with new events
    for event := range newEvents {
        client.Send(&sse.Event{
            ID:   event.ID,
            Data: event.Data,
        })
    }
}))

Real-time Notifications

broker := sse.NewBroker()

app.Get("/notifications", sse.New(func(c *mizu.Ctx, client *sse.Client) {
    userID := c.Query("user_id")

    // Register with user-specific logic
    broker.Register(client)

    // Send welcome event
    client.SendEvent("connected", `{"status": "ok"}`)

    <-client.Done
}))

// Send notification from other parts of your app
func notifyUser(userID string, message string) {
    broker.BroadcastEvent("notification", message)
}

Event Structure

type Event struct {
    ID    string // Event ID (for resumption)
    Event string // Event type name
    Data  string // Event payload
    Retry int    // Retry time in ms
}

Client Methods

func (c *Client) Send(event *Event)
func (c *Client) SendData(data string)
func (c *Client) SendEvent(eventType, data string)
func (c *Client) Close()

Broker Methods

func NewBroker() *Broker
func (b *Broker) Register(client *Client)
func (b *Broker) Broadcast(event *Event)
func (b *Broker) BroadcastData(data string)
func (b *Broker) BroadcastEvent(eventType, data string)
func (b *Broker) ClientCount() int

API Reference

func New(handler Handler) mizu.Middleware
func WithOptions(handler Handler, opts Options) mizu.Middleware
func NewBroker() *Broker

type Handler func(c *mizu.Ctx, client *Client)

Client Example (JavaScript)

const events = new EventSource('/events');

// Listen for all events
events.onmessage = (e) => {
    console.log('Data:', e.data);
};

// Listen for specific event types
events.addEventListener('notification', (e) => {
    console.log('Notification:', e.data);
});

events.addEventListener('update', (e) => {
    const data = JSON.parse(e.data);
    console.log('Update:', data);
});

events.onerror = () => {
    console.log('Connection lost, reconnecting...');
};

Event Format

id: 123
event: notification
retry: 5000
data: Hello, World!

Technical Details

Architecture

The SSE middleware implements the W3C Server-Sent Events specification using Go’s HTTP response writer and flusher interfaces. The implementation consists of three main components:
  1. Client: Represents an individual SSE connection with buffered event channels and disconnection handling
  2. Event Loop: Goroutine-based event processing that listens for both new events and client disconnection signals
  3. Broker: Optional multi-client manager for broadcasting events to all connected clients

Connection Lifecycle

  1. Accept Header Check: Validates that the client accepts text/event-stream or */*
  2. Flusher Verification: Ensures the response writer supports HTTP flushing for streaming
  3. Header Setup: Sets required SSE headers (Content-Type, Cache-Control, Connection, X-Accel-Buffering)
  4. Client Creation: Initializes client with event channel (default buffer: 10) and done channel
  5. Event Loop Start: Launches goroutine to process events from the channel
  6. Handler Execution: Runs user-provided handler function
  7. Cleanup: Waits for client disconnection before returning

Event Processing

Events are formatted according to SSE specification:
  • ID field: Optional event identifier for client resumption (id: value\n)
  • Event field: Optional event type name (event: value\n)
  • Retry field: Optional reconnection time in milliseconds (retry: value\n)
  • Data field: Event payload, split by newlines (data: value\n for each line)
  • Terminator: Empty line (\n) signals end of event
Multiline data is automatically split and prefixed with data: for each line.

Broker Implementation

The broker uses a fan-out pattern with three internal channels:
  • Register channel: Adds new clients to the client map
  • Unregister channel: Removes disconnected clients from the map
  • Broadcast channel: Queues events for distribution (buffer: 100)
The broker runs a dedicated goroutine that:
  1. Manages client registration/unregistration
  2. Broadcasts events to all clients without blocking
  3. Skips clients with full buffers to prevent blocking

Concurrency Safety

  • Client send operations use mutex locks to prevent concurrent writes
  • Broker uses RWMutex for safe concurrent access to the client map
  • Done channel is checked before send operations to prevent panics
  • Multiple Close() calls are safe and non-blocking

Last-Event-ID Support

The middleware reads the Last-Event-ID header from reconnecting clients, allowing handlers to resume event streams from a specific point.

Best Practices

  • Use event IDs for client resumption
  • Set appropriate retry intervals
  • Close connections when done
  • Use Broker for multi-client scenarios
  • Keep payload sizes reasonable
  • Handle client disconnections gracefully

Testing

Test CaseDescriptionExpected Behavior
TestEventEvent structure creation with all fieldsEvent object correctly stores ID, Event, Data, and Retry fields
TestClient_CloseClient close operationClose() can be called multiple times safely without panic, Done channel is closed
TestClient_SendSending event to clientEvent is successfully sent through the Events channel
TestClient_SendDataSending data-only eventData is wrapped in Event struct and sent through channel
TestClient_SendEventSending named event with dataEvent type and data are correctly packaged and sent
TestClient_SendOnClosedChannelSending to closed clientSend operation does not block or panic after client is closed
TestClient_sendInternal send formatting with all fieldsOutput contains properly formatted id, event, data, and retry fields
TestClient_sendMultilineDataMultiline data formattingEach line is prefixed with β€œdata:” in the output
TestClient_sendEmptyFieldsSend with minimal event dataEmpty fields (ID, Event, Retry=0) are omitted from output
TestBrokerBroker client managementClients register successfully, receive broadcasts, and unregister on disconnect
TestBroker_BroadcastEventBroadcasting named eventsAll registered clients receive the event with correct type and data
TestBroker_BroadcastBroadcasting full Event objectEvent with all fields (ID, Event, Data, Retry) is delivered to clients
TestBroker_BroadcastWithFullBufferBroadcast to client with full bufferBroadcast operation does not block, skips clients with full buffers
TestOptionsOptions structure configurationBufferSize and Retry values are correctly stored
TestNewMiddleware creation with New()Middleware is successfully created with default options
TestWithOptions_DefaultValuesMiddleware with empty optionsDefault values (BufferSize: 10, Retry: 3000) are applied
TestWithOptions_CustomValuesMiddleware with custom optionsCustom BufferSize and Retry values are used
TestSSE_NonSSERequestNon-SSE Accept headerRequest passes through to next handler, normal response returned
TestSSE_EmptyAcceptAllowedEmpty Accept headerSSE connection is allowed (treated as accepting all types)
TestSSE_WildcardAcceptAccept: / headerSSE connection is allowed (wildcard matches event-stream)