Skip to main content
The Live server manages WebSocket connections, handles message routing, and provides hooks for authentication and lifecycle events.

Creating a Server

import "github.com/go-mizu/mizu/live"

server := live.New(live.Options{
    // Options here
})

Configuration Options

OnAuth

Called when a client attempts to connect. Return a value to store in the session, or an error to reject.
live.Options{
    OnAuth: func(ctx context.Context, r *http.Request) (any, error) {
        // Option 1: Query parameter
        userID := r.URL.Query().Get("user")
        if userID == "" {
            return nil, errors.New("user required")
        }
        return userID, nil

        // Option 2: JWT token
        token := r.Header.Get("Authorization")
        claims, err := validateJWT(token)
        if err != nil {
            return nil, err
        }
        return claims.UserID, nil

        // Option 3: Cookie session
        cookie, err := r.Cookie("session")
        if err != nil {
            return nil, err
        }
        return getSessionUser(cookie.Value)
    },
}
If OnAuth is nil, all connections are accepted.

OnMessage

Called when a client sends a message. The message is parsed and topic/data are extracted.
live.Options{
    OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
        userID := s.Value().(string)
        log.Printf("Message from %s: %s", userID, topic)

        switch topic {
        case "subscribe":
            var req struct{ Topic string `json:"topic"` }
            json.Unmarshal(data, &req)
            server.Subscribe(s, req.Topic)

        case "chat":
            server.Publish("chat:general", data)
        }
    },
}
Client messages must be JSON with topic and data:
{"topic": "chat", "data": {"text": "Hello"}}

OnClose

Called when a connection closes.
live.Options{
    OnClose: func(s *live.Session, err error) {
        userID := s.Value().(string)
        if err != nil {
            log.Printf("User %s disconnected with error: %v", userID, err)
        } else {
            log.Printf("User %s disconnected cleanly", userID)
        }

        // Clean up
        removeFromOnlineList(userID)
    },
}

QueueSize

Per-session send buffer size. When full, the session is closed.
live.Options{
    QueueSize: 256,  // Default: 256
}

ReadLimit

Maximum incoming message size in bytes.
live.Options{
    ReadLimit: 4 * 1024 * 1024,  // Default: 4MB
}

Origins

List of allowed WebSocket origins.
live.Options{
    Origins: []string{
        "http://localhost:8080",
        "https://myapp.com",
    },
}
Use "*" to allow all origins (not recommended for production).

CheckOrigin

Custom origin validation (takes precedence over Origins).
live.Options{
    CheckOrigin: func(r *http.Request) bool {
        origin := r.Header.Get("Origin")
        return strings.HasSuffix(origin, ".mycompany.com")
    },
}

IDGenerator

Custom session ID generator.
live.Options{
    IDGenerator: func() string {
        return uuid.New().String()
    },
}

Server Methods

Handler

Returns an HTTP handler for WebSocket upgrades.
app.Get("/ws", mizu.Compat(server.Handler()))

// Or with standard library
http.Handle("/ws", server.Handler())

Publish

Sends data to all sessions subscribed to a topic.
server.Publish("chat:room1", []byte(`{"text":"Hello!"}`))

// With JSON
data, _ := json.Marshal(message)
server.Publish(topic, data)

Subscribe

Subscribes a session to a topic.
server.Subscribe(session, "chat:room1")
server.Subscribe(session, "user:"+userID)

Unsubscribe

Removes a session from a topic.
server.Unsubscribe(session, "chat:room1")

Complete Example

package main

import (
    "context"
    "encoding/json"
    "errors"
    "log"
    "net/http"

    "github.com/go-mizu/mizu"
    "github.com/go-mizu/mizu/live"
)

var server *live.Server

func main() {
    server = live.New(live.Options{
        // Authentication
        OnAuth: func(ctx context.Context, r *http.Request) (any, error) {
            token := r.URL.Query().Get("token")
            userID, err := validateToken(token)
            if err != nil {
                return nil, err
            }
            return userID, nil
        },

        // Message handling
        OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
            handleMessage(s, topic, data)
        },

        // Cleanup on disconnect
        OnClose: func(s *live.Session, err error) {
            userID := s.Value().(string)
            log.Printf("User %s disconnected", userID)
            server.Publish("presence", []byte(`{"type":"leave","user":"`+userID+`"}`))
        },

        // Limits
        QueueSize: 512,
        ReadLimit: 64 * 1024,

        // Origins
        Origins: []string{"https://myapp.com"},
    })

    app := mizu.New()
    app.Get("/ws", mizu.Compat(server.Handler()))
    app.Listen(":8080")
}

func handleMessage(s *live.Session, topic string, data []byte) {
    userID := s.Value().(string)

    switch topic {
    case "subscribe":
        var req struct{ Topic string `json:"topic"` }
        json.Unmarshal(data, &req)

        // Validate subscription
        if canSubscribe(userID, req.Topic) {
            server.Subscribe(s, req.Topic)
        }

    case "unsubscribe":
        var req struct{ Topic string `json:"topic"` }
        json.Unmarshal(data, &req)
        server.Unsubscribe(s, req.Topic)

    case "chat":
        // Broadcast chat message
        msg, _ := json.Marshal(map[string]any{
            "user": userID,
            "data": json.RawMessage(data),
        })
        server.Publish("chat:general", msg)
    }
}

Mounting the Handler

With Mizu

app := mizu.New()
app.Get("/ws", mizu.Compat(server.Handler()))

With Standard Library

http.Handle("/ws", server.Handler())
http.ListenAndServe(":8080", nil)

Best Practices

1. Always Authenticate

OnAuth: func(ctx context.Context, r *http.Request) (any, error) {
    // Never skip authentication in production
    token := r.Header.Get("Authorization")
    if token == "" {
        return nil, errors.New("unauthorized")
    }
    return validateToken(token)
},

2. Set Reasonable Limits

live.Options{
    QueueSize: 256,        // Prevent memory issues
    ReadLimit: 64 * 1024,  // Limit message size
}

3. Handle Close Gracefully

OnClose: func(s *live.Session, err error) {
    // Clean up resources
    userID := s.Value().(string)
    removeFromRooms(userID)
    notifyOthers(userID, "offline")
},

4. Validate Subscriptions

OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
    if topic == "subscribe" {
        var req struct{ Topic string `json:"topic"` }
        json.Unmarshal(data, &req)

        // Check permission before subscribing
        if !canSubscribe(s.Value().(string), req.Topic) {
            s.Send(live.Message{Topic: "error", Data: []byte(`"not allowed"`)})
            return
        }
        server.Subscribe(s, req.Topic)
    }
},