Skip to main content
By default, sync clients poll the server for changes. By integrating with the live package, you can push notifications to clients for instant updates.

Why Integrate?

Without LiveWith Live
Clients poll every 30sClients sync instantly
Higher latencySub-second latency
More bandwidthEfficient push notifications
Simpler setupRequires WebSocket

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                          SERVER                                  │
│                                                                  │
│  ┌────────────────┐            ┌────────────────┐              │
│  │  Sync Engine   │───notify──▶│  Live Server   │              │
│  │                │            │                │              │
│  │  Push/Pull/    │            │  Publish to    │              │
│  │  Snapshot      │            │  sync topics   │              │
│  └────────────────┘            └───────┬────────┘              │
│                                        │                        │
│                               WebSocket│                        │
└────────────────────────────────────────│────────────────────────┘

                              ┌──────────┴──────────┐
                              │                     │
                              ▼                     ▼
                     ┌────────────────┐    ┌────────────────┐
                     │   Client A     │    │   Client B     │
                     │                │    │                │
                     │ On notify:     │    │ On notify:     │
                     │ pull changes   │    │ pull changes   │
                     └────────────────┘    └────────────────┘

Server Setup

Step 1: Create Both Servers

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"

    "github.com/go-mizu/mizu"
    "github.com/go-mizu/mizu/live"
    "github.com/go-mizu/mizu/sync"
    "github.com/go-mizu/mizu/sync/memory"
)

func main() {
    // Create Live server
    liveServer := live.New(live.Options{
        OnAuth: func(ctx context.Context, r *http.Request) (any, error) {
            scope := r.URL.Query().Get("scope")
            if scope == "" {
                return nil, fmt.Errorf("scope required")
            }
            return scope, nil
        },
        OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
            // Handle subscribe requests
            if topic == "subscribe" {
                scope := s.Value().(string)
                liveServer.Subscribe(s, "sync:"+scope)
            }
        },
    })

    // Create Sync engine
    changeLog := memory.NewLog()
    syncEngine := sync.New(sync.Options{
        Log:    changeLog,
        Dedupe: memory.NewDedupe(),
        Apply:  applyMutation,
    })

    // Mount on app
    app := mizu.New()

    // Sync endpoints
    app.Post("/sync/push", pushHandler(syncEngine, liveServer))
    app.Get("/sync/pull", pullHandler(syncEngine))
    app.Get("/sync/snapshot", snapshotHandler(syncEngine))

    // WebSocket endpoint
    app.Get("/ws", mizu.Compat(liveServer.Handler()))

    app.Listen(":8080")
}

Step 2: Notify on Push

func pushHandler(engine *sync.Engine, live *live.Server) mizu.Handler {
    return func(c *mizu.Ctx) error {
        var mutations []sync.Mutation
        if err := c.BodyJSON(&mutations); err != nil {
            return c.Status(400).JSON(map[string]string{"error": "invalid"})
        }

        results, err := engine.Push(c.Context(), mutations)
        if err != nil {
            return c.Status(500).JSON(map[string]string{"error": err.Error()})
        }

        // Notify clients via Live
        for _, r := range results {
            if r.OK && len(r.Changes) > 0 {
                for _, change := range r.Changes {
                    // Publish to scope's sync topic
                    msg, _ := json.Marshal(map[string]uint64{"cursor": r.Cursor})
                    live.Publish("sync:"+change.Scope, msg)
                }
            }
        }

        return c.JSON(results)
    }
}

Client Setup (JavaScript)

class SyncClient {
    constructor(baseURL, scope) {
        this.baseURL = baseURL;
        this.scope = scope;
        this.cursor = 0;
        this.data = {};
        this.ws = null;
    }

    // Connect to Live for instant notifications
    connect() {
        this.ws = new WebSocket(`ws://${location.host}/ws?scope=${this.scope}`);

        this.ws.onopen = () => {
            console.log('Live connected');
            // Subscribe to sync notifications
            this.ws.send(JSON.stringify({topic: 'subscribe', data: {}}));
        };

        this.ws.onmessage = (event) => {
            const msg = JSON.parse(event.data);
            if (msg.topic === 'sync:' + this.scope) {
                const data = JSON.parse(msg.data);
                if (data.cursor > this.cursor) {
                    this.pull(); // Immediately sync!
                }
            }
        };

        this.ws.onclose = () => {
            console.log('Disconnected, reconnecting...');
            setTimeout(() => this.connect(), 1000);
        };
    }

    // Pull changes from server
    async pull() {
        const response = await fetch(
            `${this.baseURL}/sync/pull?scope=${this.scope}&cursor=${this.cursor}`
        );
        const result = await response.json();

        for (const change of result.changes) {
            this.applyChange(change);
            this.cursor = Math.max(this.cursor, change.cursor);
        }

        if (result.hasMore) {
            await this.pull();
        }
    }

    // Push mutation to server
    async push(name, args) {
        const mutation = {
            id: crypto.randomUUID(),
            scope: this.scope,
            name: name,
            args: args
        };

        const response = await fetch(`${this.baseURL}/sync/push`, {
            method: 'POST',
            headers: {'Content-Type': 'application/json'},
            body: JSON.stringify([mutation])
        });

        return response.json();
    }

    applyChange(change) {
        const data = JSON.parse(change.data);
        // Update local state based on change
        console.log('Applied change:', data);
    }
}

// Usage
const client = new SyncClient('http://localhost:8080', 'user:123');
client.connect();
await client.pull(); // Initial sync

Complete Example

Server

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    gosync "sync"

    "github.com/go-mizu/mizu"
    "github.com/go-mizu/mizu/live"
    "github.com/go-mizu/mizu/sync"
    "github.com/go-mizu/mizu/sync/memory"
)

type Todo struct {
    ID        string `json:"id"`
    Title     string `json:"title"`
    Completed bool   `json:"completed"`
}

var (
    todos   = make(map[string]Todo)
    todosMu gosync.RWMutex
)

func main() {
    // Live server
    liveServer := live.New(live.Options{
        OnAuth: func(ctx context.Context, r *http.Request) (any, error) {
            scope := r.URL.Query().Get("scope")
            if scope == "" {
                return nil, fmt.Errorf("scope required")
            }
            return scope, nil
        },
        OnMessage: func(ctx context.Context, s *live.Session, topic string, data []byte) {
            if topic == "subscribe" {
                scope := s.Value().(string)
                liveServer.Subscribe(s, "sync:"+scope)
            }
        },
        OnClose: func(s *live.Session, err error) {
            log.Printf("Client disconnected: %v", err)
        },
    })

    // Sync engine
    changeLog := memory.NewLog()
    syncEngine := sync.New(sync.Options{
        Log:    changeLog,
        Dedupe: memory.NewDedupe(),
        Apply:  applyMutation,
        Snapshot: func(ctx context.Context, scope string) (json.RawMessage, uint64, error) {
            todosMu.RLock()
            defer todosMu.RUnlock()
            data, _ := json.Marshal(todos)
            cursor, _ := changeLog.Cursor(ctx, scope)
            return data, cursor, nil
        },
    })

    // App
    app := mizu.New()

    app.Post("/sync/push", func(c *mizu.Ctx) error {
        var mutations []sync.Mutation
        c.BodyJSON(&mutations)

        results, _ := syncEngine.Push(c.Context(), mutations)

        // Notify via Live
        for _, r := range results {
            if r.OK {
                for _, ch := range r.Changes {
                    msg, _ := json.Marshal(map[string]uint64{"cursor": r.Cursor})
                    liveServer.Publish("sync:"+ch.Scope, msg)
                }
            }
        }

        return c.JSON(results)
    })

    app.Get("/sync/pull", func(c *mizu.Ctx) error {
        scope := c.Query("scope", sync.DefaultScope)
        cursor := uint64(c.QueryInt("cursor", 0))
        changes, hasMore, _ := syncEngine.Pull(c.Context(), scope, cursor, 100)
        return c.JSON(map[string]any{"changes": changes, "hasMore": hasMore})
    })

    app.Get("/sync/snapshot", func(c *mizu.Ctx) error {
        scope := c.Query("scope", sync.DefaultScope)
        data, cursor, _ := syncEngine.Snapshot(c.Context(), scope)
        return c.JSON(map[string]any{"data": json.RawMessage(data), "cursor": cursor})
    })

    app.Get("/ws", mizu.Compat(liveServer.Handler()))

    log.Println("Server on :8080")
    app.Listen(":8080")
}

func applyMutation(ctx context.Context, m sync.Mutation) ([]sync.Change, error) {
    scope := m.Scope
    if scope == "" {
        scope = sync.DefaultScope
    }

    switch m.Name {
    case "todo.create":
        var args struct {
            ID    string `json:"id"`
            Title string `json:"title"`
        }
        json.Unmarshal(m.Args, &args)

        todo := Todo{ID: args.ID, Title: args.Title}
        todosMu.Lock()
        todos[args.ID] = todo
        todosMu.Unlock()

        data, _ := json.Marshal(map[string]any{"op": "create", "todo": todo})
        return []sync.Change{{Scope: scope, Data: data}}, nil

    default:
        return nil, sync.ErrInvalidMutation
    }
}

Client HTML

<!DOCTYPE html>
<html>
<head><title>Real-time Sync Demo</title></head>
<body>
    <h1>Todos</h1>
    <input id="title" placeholder="New todo...">
    <button onclick="addTodo()">Add</button>
    <ul id="list"></ul>

    <script>
        const scope = 'demo';
        let cursor = 0;
        let ws;

        function connect() {
            ws = new WebSocket(`ws://${location.host}/ws?scope=${scope}`);
            ws.onopen = () => {
                ws.send(JSON.stringify({topic: 'subscribe', data: {}}));
            };
            ws.onmessage = (e) => {
                const msg = JSON.parse(e.data);
                if (msg.topic === 'sync:' + scope) {
                    pull();
                }
            };
            ws.onclose = () => setTimeout(connect, 1000);
        }

        async function pull() {
            const res = await fetch(`/sync/pull?scope=${scope}&cursor=${cursor}`);
            const {changes} = await res.json();
            changes.forEach(c => {
                cursor = Math.max(cursor, c.cursor);
                const data = JSON.parse(new TextDecoder().decode(
                    Uint8Array.from(atob(c.data), c => c.charCodeAt(0))
                ));
                if (data.op === 'create') {
                    addTodoToList(data.todo);
                }
            });
        }

        async function addTodo() {
            const title = document.getElementById('title').value;
            if (!title) return;
            await fetch('/sync/push', {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify([{
                    id: crypto.randomUUID(),
                    scope: scope,
                    name: 'todo.create',
                    args: {id: crypto.randomUUID(), title: title}
                }])
            });
            document.getElementById('title').value = '';
        }

        function addTodoToList(todo) {
            const li = document.createElement('li');
            li.textContent = todo.title;
            document.getElementById('list').appendChild(li);
        }

        connect();
        pull();
    </script>
</body>
</html>

Best Practices

1. Use Live as Accelerator, Not Requirement

Sync should work without Live via polling:
// Fallback polling when WebSocket fails
setInterval(() => {
    if (!ws || ws.readyState !== WebSocket.OPEN) {
        pull();
    }
}, 30000);

2. Match Scopes to Topics

// Sync scope
scope := "user:" + userID

// Live topic
liveServer.Publish("sync:" + scope, msg)

3. Deduplicate Client-Side

ws.onmessage = (e) => {
    const {cursor} = JSON.parse(e.data);
    if (cursor > this.cursor) {
        this.pull(); // Only pull if new data
    }
};