From 1052529ca24d9b21cbbe199d98abacfa4949f50f Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Mon, 1 Jun 2026 17:31:42 +0200 Subject: [PATCH] feat(agent): hybrid SSE downlink with long-poll fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the bare long-poll wake listener with a hybrid server→agent downlink that consumes the new GET /api/internal/agent/events SSE stream first and falls back to the long-poll wake when SSE is unavailable or silently buffered. Resurrects the SSE client retired with WebRTC (signal_client.go) as events_client.go — a bounded-scanner reader (256 KiB line / 1 MiB event) that surfaces heartbeat comments as ping events so the consumer can detect liveness. runDownlink dispatches on the new [daemon] downlink config: - auto (default): SSE-first; after maxSSEFailures dead/buffered attempts fall back to long-poll for 5 min, then re-probe SSE. - sse: SSE only, no fallback (known-good networks / testing). - poll: the pre-0.14 long-poll wake only. A stream is "healthy" only if it delivers a frame within livenessTimeout (40s vs the server's 15s heartbeat). Crucially the liveness-timeout branch returns UNHEALTHY even if an earlier frame arrived: a proxy that flushes the connect preamble (one ping) then stalls must not pin the agent to SSE forever — that's the partial-buffering case the fallback exists for. event: command applies typed controls via the same OnControl callback /agent/sync uses (idempotent); event: sync triggers an immediate sync; ping is liveness-only. OpenEventStream rides MirrorPool failover for the initial connect; mid-stream drops close the channel and the loop reopens. Bump 0.14.0. --- internal/agent/daemon.go | 1 + internal/agent/downlink_test.go | 216 +++++++++++++++++++++++++++ internal/agent/events_client.go | 208 ++++++++++++++++++++++++++ internal/agent/events_client_test.go | 193 ++++++++++++++++++++++++ internal/agent/sync.go | 208 +++++++++++++++++++++++++- internal/cmd/daemon.go | 1 + internal/cmd/version.go | 2 +- internal/config/config.go | 5 + 8 files changed, 827 insertions(+), 7 deletions(-) create mode 100644 internal/agent/downlink_test.go create mode 100644 internal/agent/events_client.go create mode 100644 internal/agent/events_client_test.go diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index 1f22171..53b4d18 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -38,6 +38,7 @@ type DaemonConfig struct { HWEncoders []string // HW-class encoder names found in `ffmpeg -encoders` HWDevices []string // device files + driver bins detected at probe time AutoUpgrade bool // honor server-flagged upgrades by downloading + restarting (default: true) + Downlink string // realtime downlink transport: "auto" (SSE+long-poll fallback) | "sse" | "poll" } // Daemon manages agent registration and the sync loop. diff --git a/internal/agent/downlink_test.go b/internal/agent/downlink_test.go new file mode 100644 index 0000000..99d8745 --- /dev/null +++ b/internal/agent/downlink_test.go @@ -0,0 +1,216 @@ +package agent + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestDownlinkMode(t *testing.T) { + cases := map[string]string{ + "": "auto", + "auto": "auto", + "AUTO": "auto", + " sse ": "sse", + "sse": "sse", + "poll": "poll", + "garbage": "auto", + } + for in, want := range cases { + sc, _ := newTestSyncClient("http://127.0.0.1:0") + sc.cfg.Downlink = in + if got := sc.downlinkMode(); got != want { + t.Errorf("downlinkMode(%q) = %q, want %q", in, got, want) + } + } +} + +func TestHandleDownlinkEvent_SyncNudge(t *testing.T) { + sc, _ := newTestSyncClient("http://127.0.0.1:0") + sc.handleDownlinkEvent(DownlinkEvent{Event: DownlinkEventSync, Data: json.RawMessage(`{"reason":"wake"}`)}) + + select { + case <-sc.SyncNow: + // good — TriggerSync fired + default: + t.Error("sync event did not trigger an immediate sync") + } +} + +func TestHandleDownlinkEvent_TypedControls(t *testing.T) { + sc, _ := newTestSyncClient("http://127.0.0.1:0") + + var gotAction, gotTask string + var gotDelete bool + sc.OnControl = func(action, taskID string, deleteFiles bool) { + gotAction, gotTask, gotDelete = action, taskID, deleteFiles + } + + payload := `{"controls":[{"action":"cancel","taskId":"task-xyz","deleteFiles":true}]}` + sc.handleDownlinkEvent(DownlinkEvent{Event: DownlinkEventCommand, Data: json.RawMessage(payload)}) + + if gotAction != "cancel" || gotTask != "task-xyz" || !gotDelete { + t.Errorf("OnControl got (%q,%q,%v), want (cancel,task-xyz,true)", gotAction, gotTask, gotDelete) + } +} + +func TestHandleDownlinkEvent_PingIsLivenessOnly(t *testing.T) { + sc, _ := newTestSyncClient("http://127.0.0.1:0") + controlCalled := false + sc.OnControl = func(string, string, bool) { controlCalled = true } + + sc.handleDownlinkEvent(DownlinkEvent{Event: DownlinkEventPing}) + + if controlCalled { + t.Error("ping must not invoke OnControl") + } + select { + case <-sc.SyncNow: + t.Error("ping must not trigger a sync") + default: + } +} + +func TestHandleDownlinkEvent_BadPayloadNoPanic(t *testing.T) { + sc, _ := newTestSyncClient("http://127.0.0.1:0") + sc.OnControl = func(string, string, bool) { t.Error("OnControl must not fire on bad payload") } + // Should log + return, not panic. + sc.handleDownlinkEvent(DownlinkEvent{Event: DownlinkEventCommand, Data: json.RawMessage(`{not json`)}) +} + +// TestRunEventStreamOnce_Healthy: a server that sends a heartbeat then a sync +// event, then closes → runEventStreamOnce returns true (healthy) and the sync +// nudge fired. +func TestRunEventStreamOnce_Healthy(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + f, _ := w.(http.Flusher) + w.Write([]byte(": hb\n\n")) + if f != nil { + f.Flush() + } + w.Write([]byte("event: sync\ndata: {}\n\n")) + if f != nil { + f.Flush() + } + // Return → response body closes → stream ends. + })) + defer srv.Close() + + sc, _ := newTestSyncClient(srv.URL) + sc.livenessTimeout = 500 * time.Millisecond + + healthy := sc.runEventStreamOnce(context.Background()) + if !healthy { + t.Error("expected healthy=true after receiving frames") + } + select { + case <-sc.SyncNow: + default: + t.Error("expected a sync nudge from the sync event") + } +} + +// TestRunEventStreamOnce_DeadOrBuffered: server connects 200 OK but sends +// nothing → liveness deadline fires → returns false (so auto mode falls back). +func TestRunEventStreamOnce_DeadOrBuffered(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + // Send NO frames — simulate a silently-buffering proxy. + <-r.Context().Done() + })) + defer srv.Close() + + sc, _ := newTestSyncClient(srv.URL) + sc.livenessTimeout = 150 * time.Millisecond + + start := time.Now() + healthy := sc.runEventStreamOnce(context.Background()) + if healthy { + t.Error("expected healthy=false when no frame arrives within liveness deadline") + } + if elapsed := time.Since(start); elapsed > 2*time.Second { + t.Errorf("liveness deadline did not fire promptly (took %s)", elapsed) + } +} + +// TestRunEventStreamOnce_PreambleThenStall: a partial-buffering proxy that +// flushes the connect preamble (one heartbeat) then goes silent must be treated +// as UNHEALTHY (false), so the auto fallback eventually triggers. This is the +// common buffering mode the zero-frame test doesn't cover. +func TestRunEventStreamOnce_PreambleThenStall(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + f, _ := w.(http.Flusher) + // Flush ONE heartbeat (the preamble) then stall — never send more. + w.Write([]byte(": connected hb=15000\n\n")) + if f != nil { + f.Flush() + } + <-r.Context().Done() + })) + defer srv.Close() + + sc, _ := newTestSyncClient(srv.URL) + sc.livenessTimeout = 150 * time.Millisecond + + if sc.runEventStreamOnce(context.Background()) { + t.Error("a stream that flushes one ping then stalls must be unhealthy (else fallback never triggers)") + } +} + +// TestRunEventStreamOnce_ConnectFail: dead server → false, no hang. +func TestRunEventStreamOnce_ConnectFail(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})) + url := srv.URL + srv.Close() // port now refuses + + sc, _ := newTestSyncClient(url) + sc.livenessTimeout = 500 * time.Millisecond + + if sc.runEventStreamOnce(context.Background()) { + t.Error("expected healthy=false on connect failure") + } +} + +// TestRunEventStreamOnce_CtxCancel: cancelling ctx returns promptly. +func TestRunEventStreamOnce_CtxCancel(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + <-r.Context().Done() + })) + defer srv.Close() + + sc, _ := newTestSyncClient(srv.URL) + sc.livenessTimeout = 10 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + done := make(chan struct{}) + go func() { + sc.runEventStreamOnce(ctx) + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("runEventStreamOnce did not return after ctx cancel") + } +} diff --git a/internal/agent/events_client.go b/internal/agent/events_client.go new file mode 100644 index 0000000..a12301b --- /dev/null +++ b/internal/agent/events_client.go @@ -0,0 +1,208 @@ +package agent + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" +) + +// DownlinkEvent is one parsed Server-Sent Event from the agent events stream +// (GET /api/internal/agent/events). Event is the SSE "event:" name; Data is the +// raw "data:" payload (nil for heartbeat pings). +type DownlinkEvent struct { + Event string + Data json.RawMessage +} + +// CommandEvent is the payload of an "command" downlink event — typed control +// actions the server pushes for instant application (cancel/pause). Mirrors the +// `controls` field of /agent/sync so the same OnControl callback handles both. +type CommandEvent struct { + Controls []ControlAction `json:"controls"` +} + +// Downlink event names. Heartbeat pings surface as a distinct event so the +// consumer can reset its liveness deadline without acting on them. +const ( + DownlinkEventPing = "ping" // SSE comment line (`: hb`) — liveness only + DownlinkEventSync = "sync" // nudge: run a full /agent/sync + DownlinkEventCommand = "command" // typed control actions +) + +// Bounds on the SSE reader, identical in spirit to the retired WebRTC signal +// reader: a hostile or buggy server must not be able to grow daemon memory by +// streaming one unbounded line or unbounded `data:` continuation lines. +const ( + eventsSSEMaxLineBytes = 256 * 1024 + eventsSSEMaxEventBytes = 1024 * 1024 +) + +// EventStream wraps an open SSE downlink connection. Read from Events() until +// the channel closes (server recycle, network drop, or ctx cancel), then call +// Close() and reopen if you want to keep listening. Always defer Close(). +type EventStream struct { + resp *http.Response + cancel context.CancelFunc + events chan DownlinkEvent + errs chan error + done chan struct{} +} + +// Events streams server-pushed downlink events. Heartbeat comments surface as +// DownlinkEvent{Event: DownlinkEventPing}. The channel closes when the +// connection ends. +func (s *EventStream) Events() <-chan DownlinkEvent { return s.events } + +// Err returns the terminating error (if any) once Events() has closed. +func (s *EventStream) Err() error { + select { + case err := <-s.errs: + return err + default: + return nil + } +} + +// Close cancels the request and waits for the reader goroutine to drain. +// Safe to call more than once. +func (s *EventStream) Close() error { + if s.cancel != nil { + s.cancel() + } + if s.resp != nil { + s.resp.Body.Close() + } + <-s.done + return nil +} + +// OpenEventStream opens a long-lived SSE connection to the agent events +// downlink. Routed through MirrorPool failover for the INITIAL connect only +// (a mid-stream drop is surfaced as a closed channel, not retried here — the +// caller reopens). Caller MUST Close() (or cancel ctx) to free resources. +func (c *Client) OpenEventStream(ctx context.Context) (*EventStream, error) { + streamCtx, cancel := context.WithCancel(ctx) + + var resp *http.Response + err := c.withMirrorFailover(func(base string) error { + req, reqErr := http.NewRequestWithContext(streamCtx, http.MethodGet, base+"/api/internal/agent/events", nil) + if reqErr != nil { + return fmt.Errorf("create events request: %w", reqErr) + } + c.setHeaders(req) + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Cache-Control", "no-cache") + + // No-timeout client: the connection is intentionally long-lived; ctx + // controls cancellation (same as the wake long-poll). + r, doErr := c.wakeClient.Do(req) + if doErr != nil { + return fmt.Errorf("events request failed: %w", doErr) + } + if r.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(r.Body, 1<<10)) + r.Body.Close() + return &HTTPError{StatusCode: r.StatusCode, Message: strings.TrimSpace(string(body))} + } + resp = r + return nil + }) + if err != nil { + cancel() + return nil, err + } + + stream := &EventStream{ + resp: resp, + cancel: cancel, + events: make(chan DownlinkEvent, 8), + errs: make(chan error, 1), + done: make(chan struct{}), + } + go stream.read() + return stream, nil +} + +func (s *EventStream) read() { + defer close(s.done) + defer close(s.events) + + scanner := bufio.NewScanner(s.resp.Body) + scanner.Buffer(make([]byte, 16*1024), eventsSSEMaxLineBytes) + + ctx := s.resp.Request.Context() + var dataBuf bytes.Buffer + var eventName string + + emit := func(ev DownlinkEvent) bool { + select { + case s.events <- ev: + return true + case <-ctx.Done(): + return false + } + } + + for scanner.Scan() { + line := strings.TrimRight(scanner.Text(), "\r") + + if line == "" { + // Blank line ends an event — dispatch if we accumulated data. + if dataBuf.Len() > 0 { + name := eventName + if name == "" { + name = "message" + } + data := make([]byte, dataBuf.Len()) + copy(data, dataBuf.Bytes()) + if !emit(DownlinkEvent{Event: name, Data: json.RawMessage(data)}) { + return + } + } + dataBuf.Reset() + eventName = "" + continue + } + + if strings.HasPrefix(line, ":") { + // SSE comment / heartbeat — surface as a ping so the consumer resets + // its liveness deadline (and can tell a live stream from a silently + // buffered one that never delivers anything). + if !emit(DownlinkEvent{Event: DownlinkEventPing}) { + return + } + continue + } + if strings.HasPrefix(line, "event:") { + eventName = strings.TrimSpace(line[len("event:"):]) + continue + } + if strings.HasPrefix(line, "data:") { + payload := strings.TrimSpace(line[len("data:"):]) + if dataBuf.Len()+len(payload)+1 > eventsSSEMaxEventBytes { + select { + case s.errs <- fmt.Errorf("sse: event exceeded %d bytes", eventsSSEMaxEventBytes): + default: + } + return + } + if dataBuf.Len() > 0 { + dataBuf.WriteByte('\n') + } + dataBuf.WriteString(payload) + continue + } + // id:, retry:, unknown fields — ignored. + } + if err := scanner.Err(); err != nil { + select { + case s.errs <- err: + default: + } + } +} diff --git a/internal/agent/events_client_test.go b/internal/agent/events_client_test.go new file mode 100644 index 0000000..3ef079f --- /dev/null +++ b/internal/agent/events_client_test.go @@ -0,0 +1,193 @@ +package agent + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// sseServer returns an httptest server that writes the given raw SSE body and +// flushes, then holds the connection until the request context is cancelled (so +// the client drives the close, like the real long-lived endpoint). +func sseServer(t *testing.T, body string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte(body)); err != nil { + return + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + <-r.Context().Done() + })) +} + +func TestOpenEventStream_ParsesTypedEvents(t *testing.T) { + body := "retry: 2000\n\n" + + ": connected hb=15000\n\n" + + "event: sync\ndata: {\"reason\":\"wake\"}\n\n" + + "event: command\ndata: {\"controls\":[{\"action\":\"cancel\",\"taskId\":\"t1\"}]}\n\n" + srv := sseServer(t, body) + defer srv.Close() + + _, client := newTestSyncClient(srv.URL) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.OpenEventStream(ctx) + if err != nil { + t.Fatalf("OpenEventStream: %v", err) + } + defer stream.Close() + + var got []DownlinkEvent + timeout := time.After(2 * time.Second) + for len(got) < 3 { + select { + case ev, ok := <-stream.Events(): + if !ok { + t.Fatalf("stream closed early after %d events", len(got)) + } + got = append(got, ev) + case <-timeout: + t.Fatalf("timed out; got %d events: %+v", len(got), got) + } + } + + // First frame is the heartbeat comment surfaced as a ping. + if got[0].Event != DownlinkEventPing { + t.Errorf("event[0] = %q, want ping", got[0].Event) + } + if got[1].Event != DownlinkEventSync { + t.Errorf("event[1] = %q, want sync", got[1].Event) + } + if got[2].Event != DownlinkEventCommand { + t.Errorf("event[2] = %q, want command", got[2].Event) + } + if !strings.Contains(string(got[2].Data), "cancel") { + t.Errorf("command data missing payload: %s", got[2].Data) + } +} + +func TestOpenEventStream_MultiLineData(t *testing.T) { + // Two data: lines for one event must join with a newline. + body := "event: sync\ndata: line1\ndata: line2\n\n" + srv := sseServer(t, body) + defer srv.Close() + + _, client := newTestSyncClient(srv.URL) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.OpenEventStream(ctx) + if err != nil { + t.Fatalf("OpenEventStream: %v", err) + } + defer stream.Close() + + select { + case ev := <-stream.Events(): + if string(ev.Data) != "line1\nline2" { + t.Errorf("data = %q, want \"line1\\nline2\"", ev.Data) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestOpenEventStream_RejectsOversizedEvent(t *testing.T) { + // Many data: continuation lines until past eventsSSEMaxEventBytes → the + // reader surfaces an error and closes the channel (so the loop reconnects). + var b strings.Builder + b.WriteString("event: command\n") + chunk := "data: " + strings.Repeat("x", 4096) + "\n" + for b.Len() < eventsSSEMaxEventBytes+8192 { + b.WriteString(chunk) + } + b.WriteString("\n") + srv := sseServer(t, b.String()) + defer srv.Close() + + _, client := newTestSyncClient(srv.URL) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.OpenEventStream(ctx) + if err != nil { + t.Fatalf("OpenEventStream: %v", err) + } + defer stream.Close() + + // Drain until the channel closes (the oversized event must NOT be emitted). + timeout := time.After(2 * time.Second) + for { + select { + case ev, ok := <-stream.Events(): + if !ok { + if stream.Err() == nil { + t.Error("expected an error after oversized event, got nil") + } + return + } + if ev.Event == DownlinkEventCommand { + t.Fatalf("oversized command event must not be dispatched") + } + case <-timeout: + t.Fatal("timed out; channel never closed after oversized event") + } + } +} + +func TestOpenEventStream_Non200ReturnsError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNotFound) + fmt.Fprint(w, `{"error":"not found"}`) + })) + defer srv.Close() + + _, client := newTestSyncClient(srv.URL) + _, err := client.OpenEventStream(context.Background()) + if err == nil { + t.Fatal("expected error on 404, got nil") + } + var httpErr *HTTPError + if !errors.As(err, &httpErr) || httpErr.StatusCode != http.StatusNotFound { + t.Errorf("expected HTTPError 404, got %v", err) + } +} + +func TestEventStream_CloseCancelsRead(t *testing.T) { + srv := sseServer(t, ": connected\n\n") + defer srv.Close() + + _, client := newTestSyncClient(srv.URL) + stream, err := client.OpenEventStream(context.Background()) + if err != nil { + t.Fatalf("OpenEventStream: %v", err) + } + + // Drain the initial ping. + select { + case <-stream.Events(): + case <-time.After(2 * time.Second): + t.Fatal("no initial ping") + } + + done := make(chan struct{}) + go func() { + stream.Close() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Close() did not return — read goroutine leaked") + } +} diff --git a/internal/agent/sync.go b/internal/agent/sync.go index ac856a5..f4e584e 100644 --- a/internal/agent/sync.go +++ b/internal/agent/sync.go @@ -2,8 +2,10 @@ package agent import ( "context" + "encoding/json" "log" "runtime" + "strings" "sync" "sync/atomic" "time" @@ -15,6 +17,23 @@ const ( // SyncIntervalIdle is the sync interval when nobody is watching. // Keep this short enough to pick up stream requests quickly without hammering the server. SyncIntervalIdle = 10 * time.Second + + // --- Downlink (server→agent realtime) tuning --- + + // downlinkLivenessTimeout is the maximum time to wait for ANY SSE frame + // (heartbeat comment or event) before declaring the stream dead. The server + // heartbeats every ~15s; ~2.5× gives slack for jitter while still catching a + // path that connects 200 OK but silently buffers (delivers nothing until + // close) — the failure mode that justifies the long-poll fallback. + downlinkLivenessTimeout = 40 * time.Second + // sseReconnectDelay is the pause between SSE connection attempts. + sseReconnectDelay = 2 * time.Second + // maxSSEFailures is the number of consecutive failed/dead SSE attempts + // before "auto" mode falls back to the long-poll wake downlink. + maxSSEFailures = 3 + // downlinkFallbackWindow is how long to ride long-poll before re-probing SSE, + // so a transient proxy hiccup doesn't pin the agent on polling forever. + downlinkFallbackWindow = 5 * time.Minute ) // SyncClient handles bidirectional state synchronization between the CLI and server. @@ -53,6 +72,11 @@ type SyncClient struct { watching atomic.Bool interval atomic.Int64 // stored as nanoseconds + // livenessTimeout is the max wait for any SSE frame before the downlink + // treats the stream as dead/buffered. Defaults to downlinkLivenessTimeout; + // overridable in tests. + livenessTimeout time.Duration + // pendingDeleteConfirmed holds item IDs to report as deleted in the next sync. pendingDeleteMu sync.Mutex pendingDeleteConfirmed []int @@ -64,10 +88,11 @@ type SyncClient struct { // NewSyncClient creates a sync client. func NewSyncClient(client *Client, cfg DaemonConfig, state *LocalState) *SyncClient { sc := &SyncClient{ - client: client, - cfg: cfg, - state: state, - SyncNow: make(chan struct{}, 1), + client: client, + cfg: cfg, + state: state, + SyncNow: make(chan struct{}, 1), + livenessTimeout: downlinkLivenessTimeout, } sc.interval.Store(int64(SyncIntervalIdle)) return sc @@ -88,8 +113,9 @@ func (sc *SyncClient) TriggerSync() { // Run starts the adaptive sync loop. Blocks until ctx is cancelled. func (sc *SyncClient) Run(ctx context.Context) error { - // Start wake listener in background — triggers immediate syncs on demand. - go sc.runWakeListener(ctx) + // Start the realtime downlink in background — pushes immediate syncs + + // typed control commands on demand (SSE-first, long-poll fallback). + go sc.runDownlink(ctx) // Initial sync immediately sc.doSync(ctx) @@ -284,6 +310,176 @@ func (sc *SyncClient) runWakeListener(ctx context.Context) { } } +// runWakeListenerFor runs the long-poll wake listener for up to `dur`, then +// returns so the caller can re-probe SSE. Used as the auto-mode fallback. +func (sc *SyncClient) runWakeListenerFor(ctx context.Context, dur time.Duration) { + childCtx, cancel := context.WithTimeout(ctx, dur) + defer cancel() + sc.runWakeListener(childCtx) +} + +// downlinkMode resolves the configured downlink transport: +// - "auto" (default): SSE-first, fall back to long-poll wake if SSE is +// unavailable or silently buffered, then periodically re-probe SSE. +// - "sse": SSE only, no long-poll fallback (testing / known-good networks). +// - "poll": long-poll wake only (the pre-0.14 behavior). +func (sc *SyncClient) downlinkMode() string { + switch strings.ToLower(strings.TrimSpace(sc.cfg.Downlink)) { + case "poll": + return "poll" + case "sse": + return "sse" + default: + return "auto" + } +} + +// runDownlink is the server→agent realtime loop. It supersedes the bare +// long-poll wake listener: an SSE connection pushes typed control commands and +// sync nudges over a single persistent connection, with the long-poll wake as a +// buffering-tolerant fallback (long-poll survives proxies that buffer the +// response body and break SSE). Runs until ctx is cancelled. +func (sc *SyncClient) runDownlink(ctx context.Context) { + switch sc.downlinkMode() { + case "poll": + log.Printf("downlink: long-poll wake (downlink=poll)") + sc.runWakeListener(ctx) + case "sse": + log.Printf("downlink: SSE only (downlink=sse) — no long-poll fallback") + sc.runSSELoop(ctx, false) + default: + sc.runSSELoop(ctx, true) + } +} + +// runSSELoop maintains the SSE downlink, reconnecting across server recycles +// and transient drops. When allowFallback is true (auto mode), it switches to +// the long-poll wake after maxSSEFailures consecutive dead attempts, then +// re-probes SSE after downlinkFallbackWindow. +func (sc *SyncClient) runSSELoop(ctx context.Context, allowFallback bool) { + failures := 0 + for ctx.Err() == nil { + healthy := sc.runEventStreamOnce(ctx) + if ctx.Err() != nil { + return + } + if healthy { + failures = 0 + // A healthy stream that ended is a normal server recycle — reconnect. + sc.sleep(ctx, sseReconnectDelay) + continue + } + + failures++ + if allowFallback && failures >= maxSSEFailures { + log.Printf("downlink: SSE unavailable after %d attempts — falling back to long-poll for %s", failures, downlinkFallbackWindow) + sc.runWakeListenerFor(ctx, downlinkFallbackWindow) + failures = 0 + continue + } + sc.sleep(ctx, sseReconnectDelay) + } +} + +// runEventStreamOnce opens one SSE connection and consumes it until it dies or +// ctx is cancelled. Returns true if the stream was "healthy" — i.e. it +// delivered at least one frame (event or heartbeat) — and false if it failed to +// connect or delivered nothing within downlinkLivenessTimeout (dead or silently +// buffered). The caller uses that signal to decide whether to fall back. +func (sc *SyncClient) runEventStreamOnce(ctx context.Context) bool { + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + stream, err := sc.client.OpenEventStream(streamCtx) + if err != nil { + if ctx.Err() == nil { + log.Printf("downlink: SSE connect failed: %v", err) + } + return false + } + defer stream.Close() + + healthy := false + liveness := time.NewTimer(sc.livenessTimeout) + defer liveness.Stop() + + for { + select { + case <-ctx.Done(): + return healthy + case <-liveness.C: + // No frame within the deadline. The server heartbeats every ~15s, so + // silence past livenessTimeout (40s) means the path is dead OR + // silently buffering — INCLUDING a proxy that flushed the connect + // preamble (one ping) then stalled. Return false REGARDLESS of any + // earlier frame, so this counts toward the long-poll fallback; a + // stream that flushes one ping and goes quiet must not be treated as + // healthy or the fallback never triggers for partial bufferers. + if ctx.Err() == nil { + log.Printf("downlink: no SSE frame within %s — dropping (dead or buffered path)", sc.livenessTimeout) + } + return false + case ev, ok := <-stream.Events(): + if !ok { + if e := stream.Err(); e != nil && ctx.Err() == nil { + log.Printf("downlink: SSE stream ended: %v", e) + } + return healthy + } + if !healthy { + // First frame on this connection — the path flushes, so log once + // (on a silently-buffered path no frame ever arrives and we never + // claim connected). + log.Printf("downlink: SSE connected") + } + healthy = true + if !liveness.Stop() { + select { + case <-liveness.C: + default: + } + } + liveness.Reset(sc.livenessTimeout) + sc.handleDownlinkEvent(ev) + } + } +} + +// handleDownlinkEvent applies one pushed downlink event. Pings are liveness-only; +// "sync" nudges an immediate full sync; "command" carries typed control actions +// applied via the same OnControl callback /agent/sync uses (idempotent, so the +// authoritative sync re-delivering them is harmless). +func (sc *SyncClient) handleDownlinkEvent(ev DownlinkEvent) { + switch ev.Event { + case DownlinkEventPing: + // Liveness only. + case DownlinkEventSync: + sc.TriggerSync() + case DownlinkEventCommand: + var cmd CommandEvent + if err := json.Unmarshal(ev.Data, &cmd); err != nil { + log.Printf("downlink: bad command payload: %v", err) + return + } + for _, ctrl := range cmd.Controls { + log.Printf("downlink: control %s on task %s", ctrl.Action, ShortID(ctrl.TaskID)) + if sc.OnControl != nil { + sc.OnControl(ctrl.Action, ctrl.TaskID, ctrl.DeleteFiles) + } + } + default: + // Unknown event from a newer server — ignore forward-compatibly. + } +} + +// sleep blocks for d or until ctx is cancelled. +func (sc *SyncClient) sleep(ctx context.Context, d time.Duration) { + select { + case <-time.After(d): + case <-ctx.Done(): + } +} + func (sc *SyncClient) adjustInterval(watching bool) { prev := sc.watching.Load() sc.watching.Store(watching) diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index c3f870e..a0e1b49 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -179,6 +179,7 @@ func runDaemonStart() error { HWEncoders: hwDiag.Encoders, HWDevices: hwDiag.Devices, AutoUpgrade: cfg.Daemon.AutoUpgradeEnabled(), + Downlink: cfg.Daemon.Downlink, } // Create HTTP client with mirror failover so a `.com` block-out rolls diff --git a/internal/cmd/version.go b/internal/cmd/version.go index 52d13d8..cfaa4f6 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.13.0" +var Version = "0.14.0" diff --git a/internal/config/config.go b/internal/config/config.go index c433846..f5ac09b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -151,6 +151,11 @@ type DaemonConfig struct { // logs "new version available" and the operator must run `unarr update` // manually. Default: true. Available since unarr 0.9.6. AutoUpgrade *bool `toml:"auto_upgrade"` + // Downlink selects the server→agent realtime transport. "auto" (default) + // uses an SSE push connection with the long-poll wake as a buffering-tolerant + // fallback; "sse" forces SSE only (no fallback); "poll" forces the pre-0.14 + // long-poll wake only. Empty = "auto". Available since unarr 0.14.0. + Downlink string `toml:"downlink"` } // AutoUpgradeEnabled returns the resolved AutoUpgrade flag — defaults to true