Skip to main content
The publish/subscribe pattern is at the heart of the live package. Sessions subscribe to topics, and when you publish to a topic, all subscribers receive the message.

How Pub/Sub Works

                    ┌─────────────────────────┐
                    │         Server          │
                    │                         │
 Publish to         │   Topic: "chat:room1"   │      Delivered to
 "chat:room1"  ────▶│   ┌─────────────────┐   │────▶  Session A
                    │   │  Subscribers:   │   │────▶  Session B
                    │   │  - Session A    │   │
                    │   │  - Session B    │   │       Session C (not subscribed)
                    │   │  - Session D    │   │────▶  Session D
                    │   └─────────────────┘   │
                    │                         │
                    └─────────────────────────┘

Subscribing

Subscribe sessions to topics via server methods:
// In OnMessage handler
OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
    if topic == "subscribe" {
        var req struct{ Topic string `json:"topic"` }
        json.Unmarshal(data, &req)
        server.Subscribe(s, req.Topic)
    }

    if topic == "unsubscribe" {
        var req struct{ Topic string `json:"topic"` }
        json.Unmarshal(data, &req)
        server.Unsubscribe(s, req.Topic)
    }
}

Publishing

To All Subscribers

server.Publish("chat:room1", []byte(`{"text":"Hello everyone!"}`))

With JSON Data

data, _ := json.Marshal(map[string]any{
    "type": "message",
    "user": userID,
    "text": "Hello!",
})
server.Publish("chat:room1", data)

To a Single Session

s.Send(live.Message{
    Topic: "notification",
    Data:  []byte(`{"title":"Hello"}`),
})

Topic Naming Patterns

Use consistent, hierarchical names:

User Topics

user:{userID}              # Personal messages
user:{userID}:notifications  # User notifications

Room/Channel Topics

chat:general        # General chat
chat:room:{roomID}  # Specific room

Event Topics

events:user:created   # User events
events:order:updated  # Order events

Resource Topics

item:{itemID}         # Updates for specific item
team:{teamID}         # Team broadcasts

Common Patterns

Chat Room

OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
    userID := s.Value().(string)

    switch topic {
    case "join_room":
        var req struct{ Room string `json:"room"` }
        json.Unmarshal(data, &req)

        server.Subscribe(s, "room:"+req.Room)

        // Announce join
        msg, _ := json.Marshal(map[string]string{"type": "join", "user": userID})
        server.Publish("room:"+req.Room, msg)

    case "chat":
        var req struct {
            Room string `json:"room"`
            Text string `json:"text"`
        }
        json.Unmarshal(data, &req)

        msg, _ := json.Marshal(map[string]string{
            "type": "message",
            "user": userID,
            "text": req.Text,
        })
        server.Publish("room:"+req.Room, msg)
    }
}

Notifications

// Send notification from anywhere in your app
func NotifyUser(userID string, notification Notification) {
    data, _ := json.Marshal(notification)
    server.Publish("user:"+userID+":notifications", data)
}

// Usage
NotifyUser("user123", Notification{
    Title:   "Order Shipped",
    Message: "Your order #456 has shipped!",
})

Real-time Updates

// When data changes, notify watchers
func UpdateItem(item Item) error {
    // Save to database
    if err := db.Save(item); err != nil {
        return err
    }

    // Notify all watchers
    data, _ := json.Marshal(map[string]any{
        "type": "updated",
        "item": item,
    })
    server.Publish("item:"+item.ID, data)

    return nil
}

Presence

OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
    userID := s.Value().(string)

    if topic == "join_room" {
        var req struct{ Room string `json:"room"` }
        json.Unmarshal(data, &req)

        // Subscribe to room presence and messages
        server.Subscribe(s, "room:"+req.Room+":presence")
        server.Subscribe(s, "room:"+req.Room+":messages")

        // Announce presence
        msg, _ := json.Marshal(map[string]string{"type": "join", "user": userID})
        server.Publish("room:"+req.Room+":presence", msg)
    }
},

OnClose: func(s *live.Session, err error) {
    userID := s.Value().(string)
    // Note: You'd need to track which rooms the user was in
    // to announce departure
}

Publishing from HTTP Handlers

You can publish from any part of your application:
// API endpoint triggers notification
app.Post("/api/items", func(c *mizu.Ctx) error {
    var item Item
    c.BodyJSON(&item)

    // Save
    db.Save(&item)

    // Notify via WebSocket
    data, _ := json.Marshal(map[string]any{
        "type": "new_item",
        "item": item,
    })
    server.Publish("team:"+item.TeamID, data)

    return c.JSON(item)
})

// Background job publishes updates
func processJob(job Job) {
    // ... do work ...

    progress, _ := json.Marshal(map[string]any{
        "type":     "progress",
        "job_id":   job.ID,
        "progress": job.Progress,
    })
    server.Publish("job:"+job.ID, progress)
}

Message Format

Messages sent to clients have this format:
{
    "topic": "chat:room1",
    "data": {"type": "message", "user": "alice", "text": "Hello!"}
}
Client-side parsing:
ws.onmessage = (event) => {
    const msg = JSON.parse(event.data);
    console.log(`Topic: ${msg.topic}`);
    console.log(`Data:`, msg.data);
};

Performance Tips

Topic Granularity

// Too broad - everyone gets everything
server.Subscribe(s, "all_updates")

// Too narrow - many small topics
server.Subscribe(s, "user:123:item:456:field:name")

// Just right - meaningful groups
server.Subscribe(s, "user:"+userID)
server.Subscribe(s, "team:"+teamID)

Message Size

// Bad: send large objects
server.Publish(topic, entireLargeObject)

// Good: send just what changed
server.Publish(topic, []byte(`{"id":"123","status":"updated"}`))

Fan-out

Be aware of high fan-out:
// 10,000 subscribers = 10,000 messages
server.Publish("global", message)

// Consider if all really need this update

Subscription Validation

Always validate subscriptions:
OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
    if topic == "subscribe" {
        var req struct{ Topic string `json:"topic"` }
        json.Unmarshal(data, &req)

        userID := s.Value().(string)

        // Validate permission
        if !canSubscribe(userID, req.Topic) {
            s.Send(live.Message{
                Topic: "error",
                Data:  []byte(`{"message":"not authorized"}`),
            })
            return
        }

        server.Subscribe(s, req.Topic)
    }
}

func canSubscribe(userID, topic string) bool {
    // Check topic patterns
    if strings.HasPrefix(topic, "admin:") {
        return isAdmin(userID)
    }
    if strings.HasPrefix(topic, "team:") {
        teamID := strings.Split(strings.TrimPrefix(topic, "team:"), ":")[0]
        return isTeamMember(userID, teamID)
    }
    if strings.HasPrefix(topic, "user:") {
        targetUser := strings.Split(strings.TrimPrefix(topic, "user:"), ":")[0]
        return targetUser == userID  // Can only subscribe to own topics
    }
    return true
}

Best Practices

1. Use Namespaced Topics

// Good: clear hierarchy
"chat:room:123"
"user:456:notifications"

// Avoid: ambiguous
"room123"
"notifications"

2. Document Conventions

// Topic conventions:
// - user:{id}           Direct messages to user
// - user:{id}:notifs    User notifications
// - team:{id}           Team broadcasts
// - chat:{roomId}       Chat room messages
// - presence:{roomId}   Room presence updates

3. Consistent Message Format

type Message struct {
    Type string `json:"type"`
    Data any    `json:"data,omitempty"`
}

func publish(topic string, msgType string, data any) {
    msg := Message{Type: msgType, Data: data}
    bytes, _ := json.Marshal(msg)
    server.Publish(topic, bytes)
}

// Usage
publish("chat:room1", "message", ChatMessage{User: "alice", Text: "Hi"})
publish("chat:room1", "typing", TypingIndicator{User: "bob"})