unarr/internal/agent/sync.go
Deivid Soto 1052529ca2 feat(agent): hybrid SSE downlink with long-poll fallback
Replace the bare long-poll wake listener with a hybrid server→agent
downlink that consumes the new GET /api/internal/agent/events SSE stream
first and falls back to the long-poll wake when SSE is unavailable or
silently buffered. Resurrects the SSE client retired with WebRTC
(signal_client.go) as events_client.go — a bounded-scanner reader
(256 KiB line / 1 MiB event) that surfaces heartbeat comments as ping
events so the consumer can detect liveness.

runDownlink dispatches on the new [daemon] downlink config:
  - auto (default): SSE-first; after maxSSEFailures dead/buffered attempts
    fall back to long-poll for 5 min, then re-probe SSE.
  - sse:  SSE only, no fallback (known-good networks / testing).
  - poll: the pre-0.14 long-poll wake only.

A stream is "healthy" only if it delivers a frame within livenessTimeout
(40s vs the server's 15s heartbeat). Crucially the liveness-timeout branch
returns UNHEALTHY even if an earlier frame arrived: a proxy that flushes
the connect preamble (one ping) then stalls must not pin the agent to SSE
forever — that's the partial-buffering case the fallback exists for.

event: command applies typed controls via the same OnControl callback
/agent/sync uses (idempotent); event: sync triggers an immediate sync;
ping is liveness-only. OpenEventStream rides MirrorPool failover for the
initial connect; mid-stream drops close the channel and the loop reopens.

Bump 0.14.0.
2026-06-01 17:31:42 +02:00

507 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package agent
import (
"context"
"encoding/json"
"log"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
// SyncIntervalWatching is the sync interval when someone is viewing the web UI.
SyncIntervalWatching = 3 * time.Second
// SyncIntervalIdle is the sync interval when nobody is watching.
// Keep this short enough to pick up stream requests quickly without hammering the server.
SyncIntervalIdle = 10 * time.Second
// --- Downlink (server→agent realtime) tuning ---
// downlinkLivenessTimeout is the maximum time to wait for ANY SSE frame
// (heartbeat comment or event) before declaring the stream dead. The server
// heartbeats every ~15s; ~2.5× gives slack for jitter while still catching a
// path that connects 200 OK but silently buffers (delivers nothing until
// close) — the failure mode that justifies the long-poll fallback.
downlinkLivenessTimeout = 40 * time.Second
// sseReconnectDelay is the pause between SSE connection attempts.
sseReconnectDelay = 2 * time.Second
// maxSSEFailures is the number of consecutive failed/dead SSE attempts
// before "auto" mode falls back to the long-poll wake downlink.
maxSSEFailures = 3
// downlinkFallbackWindow is how long to ride long-poll before re-probing SSE,
// so a transient proxy hiccup doesn't pin the agent on polling forever.
downlinkFallbackWindow = 5 * time.Minute
)
// SyncClient handles bidirectional state synchronization between the CLI and server.
// It sends the CLI's full execution state and receives all pending server actions
// in a single HTTP round-trip, at an adaptive interval.
type SyncClient struct {
client *Client
cfg DaemonConfig
state *LocalState
// Callbacks — set by the daemon before calling Run.
OnNewTasks func(tasks []Task)
OnControl func(action, taskID string, deleteFiles bool)
OnStreamRequest func(req StreamRequest)
OnStreamSession func(sess StreamSession)
OnUpgrade func(version string)
OnScan func()
OnWatchingChange func(watching bool)
OnSyncSuccess func() // called after each successful sync (e.g. to update state file)
GetFreeSlots func() int
GetTaskStates func() []TaskState // returns current state of all active + recently finished tasks
// GetVPNState returns the live managed-VPN split-tunnel state (whether the
// WireGuard tunnel is up, the mode, and the exit server) so the web can track
// which agent holds the single WG slot.
GetVPNState func() (active bool, mode, server string)
// GetFunnelURL returns the CloudFlare Quick Tunnel public hostname if one
// is active, else "". Sent on every sync so the web picks it up live.
GetFunnelURL func() string
// OnDeleteFiles is called when the server requests file deletion from disk.
// It should delete the files and return the IDs of successfully deleted items.
OnDeleteFiles func(items []LibraryDeleteRequest) []int
// SyncNow triggers an immediate sync (e.g., on task completion).
SyncNow chan struct{}
watching atomic.Bool
interval atomic.Int64 // stored as nanoseconds
// livenessTimeout is the max wait for any SSE frame before the downlink
// treats the stream as dead/buffered. Defaults to downlinkLivenessTimeout;
// overridable in tests.
livenessTimeout time.Duration
// pendingDeleteConfirmed holds item IDs to report as deleted in the next sync.
pendingDeleteMu sync.Mutex
pendingDeleteConfirmed []int
// deleteInFlight tracks item IDs currently being processed or awaiting confirmation.
// Prevents the same file from being passed to OnDeleteFiles multiple times.
deleteInFlight map[int]struct{}
}
// NewSyncClient creates a sync client.
func NewSyncClient(client *Client, cfg DaemonConfig, state *LocalState) *SyncClient {
sc := &SyncClient{
client: client,
cfg: cfg,
state: state,
SyncNow: make(chan struct{}, 1),
livenessTimeout: downlinkLivenessTimeout,
}
sc.interval.Store(int64(SyncIntervalIdle))
return sc
}
// Watching returns whether someone is viewing the web UI.
func (sc *SyncClient) Watching() bool {
return sc.watching.Load()
}
// TriggerSync requests an immediate sync cycle.
func (sc *SyncClient) TriggerSync() {
select {
case sc.SyncNow <- struct{}{}:
default:
}
}
// Run starts the adaptive sync loop. Blocks until ctx is cancelled.
func (sc *SyncClient) Run(ctx context.Context) error {
// Start the realtime downlink in background — pushes immediate syncs +
// typed control commands on demand (SSE-first, long-poll fallback).
go sc.runDownlink(ctx)
// Initial sync immediately
sc.doSync(ctx)
ticker := time.NewTicker(sc.currentInterval())
defer ticker.Stop()
for {
select {
case <-ctx.Done():
// Final sync to report latest state
finalCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
sc.doSync(finalCtx)
return nil
case <-ticker.C:
sc.doSync(ctx)
ticker.Reset(sc.currentInterval())
case <-sc.SyncNow:
sc.doSync(ctx)
ticker.Reset(sc.currentInterval())
}
}
}
func (sc *SyncClient) currentInterval() time.Duration {
return time.Duration(sc.interval.Load())
}
func (sc *SyncClient) doSync(ctx context.Context) {
req := sc.buildRequest()
resp, err := sc.client.Sync(ctx, req)
if err != nil {
if ctx.Err() == nil {
log.Printf("sync failed: %v", err)
}
return
}
sc.processResponse(resp)
sc.adjustInterval(resp.Watching)
if sc.OnSyncSuccess != nil {
sc.OnSyncSuccess()
}
}
func (sc *SyncClient) buildRequest() SyncRequest {
req := SyncRequest{
AgentID: sc.cfg.AgentID,
Name: sc.cfg.AgentName,
Version: sc.cfg.Version,
OS: runtime.GOOS,
Arch: runtime.GOARCH,
DownloadDir: sc.cfg.DownloadDir,
StreamPort: sc.cfg.StreamPort,
LanIP: sc.cfg.LanIP,
TailscaleIP: sc.cfg.TailscaleIP,
CanDelete: sc.cfg.CanDelete,
}
if sc.GetTaskStates != nil {
req.Tasks = sc.GetTaskStates()
} else {
req.Tasks = sc.state.Snapshot()
}
if free, total, err := DiskInfo(sc.cfg.DownloadDir); err == nil {
req.DiskFreeBytes = free
req.DiskTotalBytes = total
}
if sc.GetFreeSlots != nil {
req.FreeSlots = sc.GetFreeSlots()
}
if sc.GetVPNState != nil {
req.VPNActive, req.VPNMode, req.VPNServer = sc.GetVPNState()
}
if sc.GetFunnelURL != nil {
req.FunnelURL = sc.GetFunnelURL()
}
// Flush confirmed deletions from previous cycle.
// Once flushed, remove IDs from deleteInFlight — the server will stop sending
// them after this sync, so deduplication protection is no longer needed.
sc.pendingDeleteMu.Lock()
if len(sc.pendingDeleteConfirmed) > 0 {
req.DeleteConfirmed = sc.pendingDeleteConfirmed
for _, id := range sc.pendingDeleteConfirmed {
delete(sc.deleteInFlight, id)
}
sc.pendingDeleteConfirmed = nil
}
sc.pendingDeleteMu.Unlock()
return req
}
func (sc *SyncClient) processResponse(resp *SyncResponse) {
// New tasks
if len(resp.NewTasks) > 0 && sc.OnNewTasks != nil {
log.Printf("sync: received %d new task(s)", len(resp.NewTasks))
sc.OnNewTasks(resp.NewTasks)
}
// Control signals
for _, ctrl := range resp.Controls {
log.Printf("sync: control %s on task %s", ctrl.Action, ShortID(ctrl.TaskID))
if sc.OnControl != nil {
sc.OnControl(ctrl.Action, ctrl.TaskID, ctrl.DeleteFiles)
}
}
// Stream requests
for _, sr := range resp.StreamRequests {
if sc.OnStreamRequest != nil {
sc.OnStreamRequest(sr)
}
}
// HLS streaming sessions.
for _, ws := range resp.StreamSessions {
if sc.OnStreamSession != nil {
sc.OnStreamSession(ws)
}
}
// Upgrade
if resp.Upgrade != nil && resp.Upgrade.Version != "" && sc.OnUpgrade != nil {
sc.OnUpgrade(resp.Upgrade.Version)
}
// Scan
if resp.Scan && sc.OnScan != nil {
sc.OnScan()
}
// File deletions requested by the server — deduplicate against in-flight items
if len(resp.FilesToDelete) > 0 && sc.OnDeleteFiles != nil {
sc.pendingDeleteMu.Lock()
if sc.deleteInFlight == nil {
sc.deleteInFlight = make(map[int]struct{})
}
var newItems []LibraryDeleteRequest
for _, item := range resp.FilesToDelete {
if _, inFlight := sc.deleteInFlight[item.ItemID]; !inFlight {
newItems = append(newItems, item)
sc.deleteInFlight[item.ItemID] = struct{}{}
}
}
sc.pendingDeleteMu.Unlock()
if len(newItems) > 0 {
// Run deletions off the sync goroutine — disk I/O must not block the
// next sync tick. Confirmations are picked up on the next regular cycle.
go func(items []LibraryDeleteRequest) {
confirmed := sc.OnDeleteFiles(items)
if len(confirmed) > 0 {
sc.pendingDeleteMu.Lock()
sc.pendingDeleteConfirmed = append(sc.pendingDeleteConfirmed, confirmed...)
sc.pendingDeleteMu.Unlock()
}
}(newItems)
}
}
}
// runWakeListener holds a long-poll connection to /api/internal/agent/wake.
// When the server resolves it with wake=true (e.g., a stream was requested),
// it triggers an immediate sync so the CLI acts in <100ms instead of waiting
// for the next scheduled interval. Reconnects immediately after each response
// so coverage is continuous. Runs until ctx is cancelled.
func (sc *SyncClient) runWakeListener(ctx context.Context) {
const retryDelay = 2 * time.Second
for {
if ctx.Err() != nil {
return
}
woke, err := sc.client.WaitForWake(ctx)
if ctx.Err() != nil {
return
}
if err != nil {
log.Printf("wake listener: %v (retrying in %s)", err, retryDelay)
select {
case <-ctx.Done():
return
case <-time.After(retryDelay):
}
continue
}
if woke {
log.Printf("wake signal received — syncing immediately")
sc.TriggerSync()
}
// On timeout (woke=false) or after a wake, reconnect immediately.
}
}
// runWakeListenerFor runs the long-poll wake listener for up to `dur`, then
// returns so the caller can re-probe SSE. Used as the auto-mode fallback.
func (sc *SyncClient) runWakeListenerFor(ctx context.Context, dur time.Duration) {
childCtx, cancel := context.WithTimeout(ctx, dur)
defer cancel()
sc.runWakeListener(childCtx)
}
// downlinkMode resolves the configured downlink transport:
// - "auto" (default): SSE-first, fall back to long-poll wake if SSE is
// unavailable or silently buffered, then periodically re-probe SSE.
// - "sse": SSE only, no long-poll fallback (testing / known-good networks).
// - "poll": long-poll wake only (the pre-0.14 behavior).
func (sc *SyncClient) downlinkMode() string {
switch strings.ToLower(strings.TrimSpace(sc.cfg.Downlink)) {
case "poll":
return "poll"
case "sse":
return "sse"
default:
return "auto"
}
}
// runDownlink is the server→agent realtime loop. It supersedes the bare
// long-poll wake listener: an SSE connection pushes typed control commands and
// sync nudges over a single persistent connection, with the long-poll wake as a
// buffering-tolerant fallback (long-poll survives proxies that buffer the
// response body and break SSE). Runs until ctx is cancelled.
func (sc *SyncClient) runDownlink(ctx context.Context) {
switch sc.downlinkMode() {
case "poll":
log.Printf("downlink: long-poll wake (downlink=poll)")
sc.runWakeListener(ctx)
case "sse":
log.Printf("downlink: SSE only (downlink=sse) — no long-poll fallback")
sc.runSSELoop(ctx, false)
default:
sc.runSSELoop(ctx, true)
}
}
// runSSELoop maintains the SSE downlink, reconnecting across server recycles
// and transient drops. When allowFallback is true (auto mode), it switches to
// the long-poll wake after maxSSEFailures consecutive dead attempts, then
// re-probes SSE after downlinkFallbackWindow.
func (sc *SyncClient) runSSELoop(ctx context.Context, allowFallback bool) {
failures := 0
for ctx.Err() == nil {
healthy := sc.runEventStreamOnce(ctx)
if ctx.Err() != nil {
return
}
if healthy {
failures = 0
// A healthy stream that ended is a normal server recycle — reconnect.
sc.sleep(ctx, sseReconnectDelay)
continue
}
failures++
if allowFallback && failures >= maxSSEFailures {
log.Printf("downlink: SSE unavailable after %d attempts — falling back to long-poll for %s", failures, downlinkFallbackWindow)
sc.runWakeListenerFor(ctx, downlinkFallbackWindow)
failures = 0
continue
}
sc.sleep(ctx, sseReconnectDelay)
}
}
// runEventStreamOnce opens one SSE connection and consumes it until it dies or
// ctx is cancelled. Returns true if the stream was "healthy" — i.e. it
// delivered at least one frame (event or heartbeat) — and false if it failed to
// connect or delivered nothing within downlinkLivenessTimeout (dead or silently
// buffered). The caller uses that signal to decide whether to fall back.
func (sc *SyncClient) runEventStreamOnce(ctx context.Context) bool {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := sc.client.OpenEventStream(streamCtx)
if err != nil {
if ctx.Err() == nil {
log.Printf("downlink: SSE connect failed: %v", err)
}
return false
}
defer stream.Close()
healthy := false
liveness := time.NewTimer(sc.livenessTimeout)
defer liveness.Stop()
for {
select {
case <-ctx.Done():
return healthy
case <-liveness.C:
// No frame within the deadline. The server heartbeats every ~15s, so
// silence past livenessTimeout (40s) means the path is dead OR
// silently buffering — INCLUDING a proxy that flushed the connect
// preamble (one ping) then stalled. Return false REGARDLESS of any
// earlier frame, so this counts toward the long-poll fallback; a
// stream that flushes one ping and goes quiet must not be treated as
// healthy or the fallback never triggers for partial bufferers.
if ctx.Err() == nil {
log.Printf("downlink: no SSE frame within %s — dropping (dead or buffered path)", sc.livenessTimeout)
}
return false
case ev, ok := <-stream.Events():
if !ok {
if e := stream.Err(); e != nil && ctx.Err() == nil {
log.Printf("downlink: SSE stream ended: %v", e)
}
return healthy
}
if !healthy {
// First frame on this connection — the path flushes, so log once
// (on a silently-buffered path no frame ever arrives and we never
// claim connected).
log.Printf("downlink: SSE connected")
}
healthy = true
if !liveness.Stop() {
select {
case <-liveness.C:
default:
}
}
liveness.Reset(sc.livenessTimeout)
sc.handleDownlinkEvent(ev)
}
}
}
// handleDownlinkEvent applies one pushed downlink event. Pings are liveness-only;
// "sync" nudges an immediate full sync; "command" carries typed control actions
// applied via the same OnControl callback /agent/sync uses (idempotent, so the
// authoritative sync re-delivering them is harmless).
func (sc *SyncClient) handleDownlinkEvent(ev DownlinkEvent) {
switch ev.Event {
case DownlinkEventPing:
// Liveness only.
case DownlinkEventSync:
sc.TriggerSync()
case DownlinkEventCommand:
var cmd CommandEvent
if err := json.Unmarshal(ev.Data, &cmd); err != nil {
log.Printf("downlink: bad command payload: %v", err)
return
}
for _, ctrl := range cmd.Controls {
log.Printf("downlink: control %s on task %s", ctrl.Action, ShortID(ctrl.TaskID))
if sc.OnControl != nil {
sc.OnControl(ctrl.Action, ctrl.TaskID, ctrl.DeleteFiles)
}
}
default:
// Unknown event from a newer server — ignore forward-compatibly.
}
}
// sleep blocks for d or until ctx is cancelled.
func (sc *SyncClient) sleep(ctx context.Context, d time.Duration) {
select {
case <-time.After(d):
case <-ctx.Done():
}
}
func (sc *SyncClient) adjustInterval(watching bool) {
prev := sc.watching.Load()
sc.watching.Store(watching)
var newInterval time.Duration
if watching {
newInterval = SyncIntervalWatching
} else {
newInterval = SyncIntervalIdle
}
if sc.interval.Swap(int64(newInterval)) != int64(newInterval) {
log.Printf("sync: interval=%s (watching=%v)", newInterval, watching)
}
// Trigger an immediate sync when entering watching mode so stream requests
// are picked up right away without waiting for the next scheduled interval.
if watching && !prev {
sc.TriggerSync()
}
if prev != watching && sc.OnWatchingChange != nil {
sc.OnWatchingChange(watching)
}
}