How Pub/Sub Works
Copy
┌─────────────────────────┐
│ Server │
│ │
Publish to │ Topic: "chat:room1" │ Delivered to
"chat:room1" ────▶│ ┌─────────────────┐ │────▶ Session A
│ │ Subscribers: │ │────▶ Session B
│ │ - Session A │ │
│ │ - Session B │ │ Session C (not subscribed)
│ │ - Session D │ │────▶ Session D
│ └─────────────────┘ │
│ │
└─────────────────────────┘
Subscribing
Subscribe sessions to topics via server methods:Copy
// In OnMessage handler
OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
if topic == "subscribe" {
var req struct{ Topic string `json:"topic"` }
json.Unmarshal(data, &req)
server.Subscribe(s, req.Topic)
}
if topic == "unsubscribe" {
var req struct{ Topic string `json:"topic"` }
json.Unmarshal(data, &req)
server.Unsubscribe(s, req.Topic)
}
}
Publishing
To All Subscribers
Copy
server.Publish("chat:room1", []byte(`{"text":"Hello everyone!"}`))
With JSON Data
Copy
data, _ := json.Marshal(map[string]any{
"type": "message",
"user": userID,
"text": "Hello!",
})
server.Publish("chat:room1", data)
To a Single Session
Copy
s.Send(live.Message{
Topic: "notification",
Data: []byte(`{"title":"Hello"}`),
})
Topic Naming Patterns
Use consistent, hierarchical names:User Topics
Copy
user:{userID} # Personal messages
user:{userID}:notifications # User notifications
Room/Channel Topics
Copy
chat:general # General chat
chat:room:{roomID} # Specific room
Event Topics
Copy
events:user:created # User events
events:order:updated # Order events
Resource Topics
Copy
item:{itemID} # Updates for specific item
team:{teamID} # Team broadcasts
Common Patterns
Chat Room
Copy
OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
userID := s.Value().(string)
switch topic {
case "join_room":
var req struct{ Room string `json:"room"` }
json.Unmarshal(data, &req)
server.Subscribe(s, "room:"+req.Room)
// Announce join
msg, _ := json.Marshal(map[string]string{"type": "join", "user": userID})
server.Publish("room:"+req.Room, msg)
case "chat":
var req struct {
Room string `json:"room"`
Text string `json:"text"`
}
json.Unmarshal(data, &req)
msg, _ := json.Marshal(map[string]string{
"type": "message",
"user": userID,
"text": req.Text,
})
server.Publish("room:"+req.Room, msg)
}
}
Notifications
Copy
// Send notification from anywhere in your app
func NotifyUser(userID string, notification Notification) {
data, _ := json.Marshal(notification)
server.Publish("user:"+userID+":notifications", data)
}
// Usage
NotifyUser("user123", Notification{
Title: "Order Shipped",
Message: "Your order #456 has shipped!",
})
Real-time Updates
Copy
// When data changes, notify watchers
func UpdateItem(item Item) error {
// Save to database
if err := db.Save(item); err != nil {
return err
}
// Notify all watchers
data, _ := json.Marshal(map[string]any{
"type": "updated",
"item": item,
})
server.Publish("item:"+item.ID, data)
return nil
}
Presence
Copy
OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
userID := s.Value().(string)
if topic == "join_room" {
var req struct{ Room string `json:"room"` }
json.Unmarshal(data, &req)
// Subscribe to room presence and messages
server.Subscribe(s, "room:"+req.Room+":presence")
server.Subscribe(s, "room:"+req.Room+":messages")
// Announce presence
msg, _ := json.Marshal(map[string]string{"type": "join", "user": userID})
server.Publish("room:"+req.Room+":presence", msg)
}
},
OnClose: func(s *live.Session, err error) {
userID := s.Value().(string)
// Note: You'd need to track which rooms the user was in
// to announce departure
}
Publishing from HTTP Handlers
You can publish from any part of your application:Copy
// API endpoint triggers notification
app.Post("/api/items", func(c *mizu.Ctx) error {
var item Item
c.BodyJSON(&item)
// Save
db.Save(&item)
// Notify via WebSocket
data, _ := json.Marshal(map[string]any{
"type": "new_item",
"item": item,
})
server.Publish("team:"+item.TeamID, data)
return c.JSON(item)
})
// Background job publishes updates
func processJob(job Job) {
// ... do work ...
progress, _ := json.Marshal(map[string]any{
"type": "progress",
"job_id": job.ID,
"progress": job.Progress,
})
server.Publish("job:"+job.ID, progress)
}
Message Format
Messages sent to clients have this format:Copy
{
"topic": "chat:room1",
"data": {"type": "message", "user": "alice", "text": "Hello!"}
}
Copy
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
console.log(`Topic: ${msg.topic}`);
console.log(`Data:`, msg.data);
};
Performance Tips
Topic Granularity
Copy
// Too broad - everyone gets everything
server.Subscribe(s, "all_updates")
// Too narrow - many small topics
server.Subscribe(s, "user:123:item:456:field:name")
// Just right - meaningful groups
server.Subscribe(s, "user:"+userID)
server.Subscribe(s, "team:"+teamID)
Message Size
Copy
// Bad: send large objects
server.Publish(topic, entireLargeObject)
// Good: send just what changed
server.Publish(topic, []byte(`{"id":"123","status":"updated"}`))
Fan-out
Be aware of high fan-out:Copy
// 10,000 subscribers = 10,000 messages
server.Publish("global", message)
// Consider if all really need this update
Subscription Validation
Always validate subscriptions:Copy
OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
if topic == "subscribe" {
var req struct{ Topic string `json:"topic"` }
json.Unmarshal(data, &req)
userID := s.Value().(string)
// Validate permission
if !canSubscribe(userID, req.Topic) {
s.Send(live.Message{
Topic: "error",
Data: []byte(`{"message":"not authorized"}`),
})
return
}
server.Subscribe(s, req.Topic)
}
}
func canSubscribe(userID, topic string) bool {
// Check topic patterns
if strings.HasPrefix(topic, "admin:") {
return isAdmin(userID)
}
if strings.HasPrefix(topic, "team:") {
teamID := strings.Split(strings.TrimPrefix(topic, "team:"), ":")[0]
return isTeamMember(userID, teamID)
}
if strings.HasPrefix(topic, "user:") {
targetUser := strings.Split(strings.TrimPrefix(topic, "user:"), ":")[0]
return targetUser == userID // Can only subscribe to own topics
}
return true
}
Best Practices
1. Use Namespaced Topics
Copy
// Good: clear hierarchy
"chat:room:123"
"user:456:notifications"
// Avoid: ambiguous
"room123"
"notifications"
2. Document Conventions
Copy
// Topic conventions:
// - user:{id} Direct messages to user
// - user:{id}:notifs User notifications
// - team:{id} Team broadcasts
// - chat:{roomId} Chat room messages
// - presence:{roomId} Room presence updates
3. Consistent Message Format
Copy
type Message struct {
Type string `json:"type"`
Data any `json:"data,omitempty"`
}
func publish(topic string, msgType string, data any) {
msg := Message{Type: msgType, Data: data}
bytes, _ := json.Marshal(msg)
server.Publish(topic, bytes)
}
// Usage
publish("chat:room1", "message", ChatMessage{User: "alice", Text: "Hi"})
publish("chat:room1", "typing", TypingIndicator{User: "bob"})