live package, you can push notifications to clients for instant updates.
Why Integrate?
| Without Live | With Live |
|---|---|
| Clients poll every 30s | Clients sync instantly |
| Higher latency | Sub-second latency |
| More bandwidth | Efficient push notifications |
| Simpler setup | Requires WebSocket |
Architecture
Copy
┌─────────────────────────────────────────────────────────────────┐
│ SERVER │
│ │
│ ┌────────────────┐ ┌────────────────┐ │
│ │ Sync Engine │───notify──▶│ Live Server │ │
│ │ │ │ │ │
│ │ Push/Pull/ │ │ Publish to │ │
│ │ Snapshot │ │ sync topics │ │
│ └────────────────┘ └───────┬────────┘ │
│ │ │
│ WebSocket│ │
└────────────────────────────────────────│────────────────────────┘
│
┌──────────┴──────────┐
│ │
▼ ▼
┌────────────────┐ ┌────────────────┐
│ Client A │ │ Client B │
│ │ │ │
│ On notify: │ │ On notify: │
│ pull changes │ │ pull changes │
└────────────────┘ └────────────────┘
Server Setup
Step 1: Create Both Servers
Copy
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/go-mizu/mizu"
"github.com/go-mizu/mizu/live"
"github.com/go-mizu/mizu/sync"
"github.com/go-mizu/mizu/sync/memory"
)
func main() {
// Create Live server
liveServer := live.New(live.Options{
OnAuth: func(ctx context.Context, r *http.Request) (any, error) {
scope := r.URL.Query().Get("scope")
if scope == "" {
return nil, fmt.Errorf("scope required")
}
return scope, nil
},
OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
// Handle subscribe requests
if topic == "subscribe" {
scope := s.Value().(string)
liveServer.Subscribe(s, "sync:"+scope)
}
},
})
// Create Sync engine
changeLog := memory.NewLog()
syncEngine := sync.New(sync.Options{
Log: changeLog,
Dedupe: memory.NewDedupe(),
Apply: applyMutation,
})
// Mount on app
app := mizu.New()
// Sync endpoints
app.Post("/sync/push", pushHandler(syncEngine, liveServer))
app.Get("/sync/pull", pullHandler(syncEngine))
app.Get("/sync/snapshot", snapshotHandler(syncEngine))
// WebSocket endpoint
app.Get("/ws", mizu.Compat(liveServer.Handler()))
app.Listen(":8080")
}
Step 2: Notify on Push
Copy
func pushHandler(engine *sync.Engine, live *live.Server) mizu.Handler {
return func(c *mizu.Ctx) error {
var mutations []sync.Mutation
if err := c.BodyJSON(&mutations); err != nil {
return c.Status(400).JSON(map[string]string{"error": "invalid"})
}
results, err := engine.Push(c.Context(), mutations)
if err != nil {
return c.Status(500).JSON(map[string]string{"error": err.Error()})
}
// Notify clients via Live
for _, r := range results {
if r.OK && len(r.Changes) > 0 {
for _, change := range r.Changes {
// Publish to scope's sync topic
msg, _ := json.Marshal(map[string]uint64{"cursor": r.Cursor})
live.Publish("sync:"+change.Scope, msg)
}
}
}
return c.JSON(results)
}
}
Client Setup (JavaScript)
Copy
class SyncClient {
constructor(baseURL, scope) {
this.baseURL = baseURL;
this.scope = scope;
this.cursor = 0;
this.data = {};
this.ws = null;
}
// Connect to Live for instant notifications
connect() {
this.ws = new WebSocket(`ws://${location.host}/ws?scope=${this.scope}`);
this.ws.onopen = () => {
console.log('Live connected');
// Subscribe to sync notifications
this.ws.send(JSON.stringify({topic: 'subscribe', data: {}}));
};
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.topic === 'sync:' + this.scope) {
const data = JSON.parse(msg.data);
if (data.cursor > this.cursor) {
this.pull(); // Immediately sync!
}
}
};
this.ws.onclose = () => {
console.log('Disconnected, reconnecting...');
setTimeout(() => this.connect(), 1000);
};
}
// Pull changes from server
async pull() {
const response = await fetch(
`${this.baseURL}/sync/pull?scope=${this.scope}&cursor=${this.cursor}`
);
const result = await response.json();
for (const change of result.changes) {
this.applyChange(change);
this.cursor = Math.max(this.cursor, change.cursor);
}
if (result.hasMore) {
await this.pull();
}
}
// Push mutation to server
async push(name, args) {
const mutation = {
id: crypto.randomUUID(),
scope: this.scope,
name: name,
args: args
};
const response = await fetch(`${this.baseURL}/sync/push`, {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify([mutation])
});
return response.json();
}
applyChange(change) {
const data = JSON.parse(change.data);
// Update local state based on change
console.log('Applied change:', data);
}
}
// Usage
const client = new SyncClient('http://localhost:8080', 'user:123');
client.connect();
await client.pull(); // Initial sync
Complete Example
Server
Copy
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
gosync "sync"
"github.com/go-mizu/mizu"
"github.com/go-mizu/mizu/live"
"github.com/go-mizu/mizu/sync"
"github.com/go-mizu/mizu/sync/memory"
)
type Todo struct {
ID string `json:"id"`
Title string `json:"title"`
Completed bool `json:"completed"`
}
var (
todos = make(map[string]Todo)
todosMu gosync.RWMutex
)
func main() {
// Live server
liveServer := live.New(live.Options{
OnAuth: func(ctx context.Context, r *http.Request) (any, error) {
scope := r.URL.Query().Get("scope")
if scope == "" {
return nil, fmt.Errorf("scope required")
}
return scope, nil
},
OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
if topic == "subscribe" {
scope := s.Value().(string)
liveServer.Subscribe(s, "sync:"+scope)
}
},
OnClose: func(s *live.Session, err error) {
log.Printf("Client disconnected: %v", err)
},
})
// Sync engine
changeLog := memory.NewLog()
syncEngine := sync.New(sync.Options{
Log: changeLog,
Dedupe: memory.NewDedupe(),
Apply: applyMutation,
Snapshot: func(ctx context.Context, scope string) (json.RawMessage, uint64, error) {
todosMu.RLock()
defer todosMu.RUnlock()
data, _ := json.Marshal(todos)
cursor, _ := changeLog.Cursor(ctx, scope)
return data, cursor, nil
},
})
// App
app := mizu.New()
app.Post("/sync/push", func(c *mizu.Ctx) error {
var mutations []sync.Mutation
c.BodyJSON(&mutations)
results, _ := syncEngine.Push(c.Context(), mutations)
// Notify via Live
for _, r := range results {
if r.OK {
for _, ch := range r.Changes {
msg, _ := json.Marshal(map[string]uint64{"cursor": r.Cursor})
liveServer.Publish("sync:"+ch.Scope, msg)
}
}
}
return c.JSON(results)
})
app.Get("/sync/pull", func(c *mizu.Ctx) error {
scope := c.Query("scope", sync.DefaultScope)
cursor := uint64(c.QueryInt("cursor", 0))
changes, hasMore, _ := syncEngine.Pull(c.Context(), scope, cursor, 100)
return c.JSON(map[string]any{"changes": changes, "hasMore": hasMore})
})
app.Get("/sync/snapshot", func(c *mizu.Ctx) error {
scope := c.Query("scope", sync.DefaultScope)
data, cursor, _ := syncEngine.Snapshot(c.Context(), scope)
return c.JSON(map[string]any{"data": json.RawMessage(data), "cursor": cursor})
})
app.Get("/ws", mizu.Compat(liveServer.Handler()))
log.Println("Server on :8080")
app.Listen(":8080")
}
func applyMutation(ctx context.Context, m sync.Mutation) ([]sync.Change, error) {
scope := m.Scope
if scope == "" {
scope = sync.DefaultScope
}
switch m.Name {
case "todo.create":
var args struct {
ID string `json:"id"`
Title string `json:"title"`
}
json.Unmarshal(m.Args, &args)
todo := Todo{ID: args.ID, Title: args.Title}
todosMu.Lock()
todos[args.ID] = todo
todosMu.Unlock()
data, _ := json.Marshal(map[string]any{"op": "create", "todo": todo})
return []sync.Change{{Scope: scope, Data: data}}, nil
default:
return nil, sync.ErrInvalidMutation
}
}
Client HTML
Copy
<!DOCTYPE html>
<html>
<head><title>Real-time Sync Demo</title></head>
<body>
<h1>Todos</h1>
<input id="title" placeholder="New todo...">
<button onclick="addTodo()">Add</button>
<ul id="list"></ul>
<script>
const scope = 'demo';
let cursor = 0;
let ws;
function connect() {
ws = new WebSocket(`ws://${location.host}/ws?scope=${scope}`);
ws.onopen = () => {
ws.send(JSON.stringify({topic: 'subscribe', data: {}}));
};
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.topic === 'sync:' + scope) {
pull();
}
};
ws.onclose = () => setTimeout(connect, 1000);
}
async function pull() {
const res = await fetch(`/sync/pull?scope=${scope}&cursor=${cursor}`);
const {changes} = await res.json();
changes.forEach(c => {
cursor = Math.max(cursor, c.cursor);
const data = JSON.parse(new TextDecoder().decode(
Uint8Array.from(atob(c.data), c => c.charCodeAt(0))
));
if (data.op === 'create') {
addTodoToList(data.todo);
}
});
}
async function addTodo() {
const title = document.getElementById('title').value;
if (!title) return;
await fetch('/sync/push', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify([{
id: crypto.randomUUID(),
scope: scope,
name: 'todo.create',
args: {id: crypto.randomUUID(), title: title}
}])
});
document.getElementById('title').value = '';
}
function addTodoToList(todo) {
const li = document.createElement('li');
li.textContent = todo.title;
document.getElementById('list').appendChild(li);
}
connect();
pull();
</script>
</body>
</html>
Best Practices
1. Use Live as Accelerator, Not Requirement
Sync should work without Live via polling:Copy
// Fallback polling when WebSocket fails
setInterval(() => {
if (!ws || ws.readyState !== WebSocket.OPEN) {
pull();
}
}, 30000);
2. Match Scopes to Topics
Copy
// Sync scope
scope := "user:" + userID
// Live topic
liveServer.Publish("sync:" + scope, msg)
3. Deduplicate Client-Side
Copy
ws.onmessage = (e) => {
const {cursor} = JSON.parse(e.data);
if (cursor > this.cursor) {
this.pull(); // Only pull if new data
}
};