diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index a0e1b49..d129896 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -444,6 +444,10 @@ func runDaemonStart() error { // Trigger immediate sync when a download slot frees up manager.OnTaskDone = func() { d.TriggerSync() } + // Event-driven uplink: every status transition (resolving/downloading/ + // verifying/organizing/…) pushes to the server right away instead of waiting + // for the next adaptive tick. Coalesced by TriggerSync's buffered-1 channel. + manager.OnStateChange = func() { d.TriggerSync() } // Wire: sync receives new tasks → submit to manager or handle stream d.OnTasksClaimed = func(tasks []agent.Task) { @@ -458,7 +462,7 @@ func runDaemonStart() error { streamRegistry.mu.Lock() streamRegistry.cancels[t.ID] = streamCancel streamRegistry.mu.Unlock() - go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv) + go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv, func() { d.TriggerSync() }) } else { manager.Submit(ctx, t) } diff --git a/internal/cmd/stream_handler.go b/internal/cmd/stream_handler.go index fa61220..8765041 100644 --- a/internal/cmd/stream_handler.go +++ b/internal/cmd/stream_handler.go @@ -87,7 +87,7 @@ func cancelStreamTask(taskID string) { // handleStreamTask manages a streaming task lifecycle for active torrent downloads. // It creates a StreamEngine, buffers, sets the file on the persistent server, // and reports progress until the task is cancelled or the download completes. -func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine.ProgressReporter, cfg config.Config, agentClient *agent.Client, srv *engine.StreamServer) { +func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine.ProgressReporter, cfg config.Config, agentClient *agent.Client, srv *engine.StreamServer, onStateChange func()) { ctx, cancel := context.WithCancel(parentCtx) defer cancel() @@ -106,6 +106,10 @@ func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine }() task := engine.NewTaskFromAgent(at) + // Event-driven uplink: stream tasks transition outside the Manager (which + // wires this for downloads), so set it here too — resolving/downloading/ + // completed/failed get pushed to the server immediately. + task.SetOnChange(onStateChange) task.ResolvedMethod = engine.MethodTorrent reporter.Track(task) defer reporter.ReportFinal(context.Background(), task) diff --git a/internal/engine/manager.go b/internal/engine/manager.go index 617d3d9..66585cd 100644 --- a/internal/engine/manager.go +++ b/internal/engine/manager.go @@ -34,6 +34,13 @@ type Manager struct { // Used by the daemon to trigger an immediate sync. OnTaskDone func() + // OnStateChange is called after EVERY successful task status transition + // (resolving → downloading → verifying → organizing → seeding → done/failed), + // wired by the daemon to trigger an immediate sync so the server sees state + // changes in near-realtime instead of on the next adaptive tick. Coalesced + // downstream (TriggerSync is a buffered-1 send), so bursts collapse safely. + OnStateChange func() + // recentlyFinished holds tasks that completed/failed since the last sync read. // The sync goroutine reads and clears this to include final states in the next sync. recentMu sync.Mutex @@ -82,6 +89,8 @@ func NewManager(cfg ManagerConfig, reporter *ProgressReporter, downloaders ...Do // Submit queues a task for download. Non-blocking if capacity available. func (m *Manager) Submit(ctx context.Context, at agent.Task) { task := NewTaskFromAgent(at) + // Event-driven uplink: push every status transition to the server immediately. + task.SetOnChange(m.OnStateChange) // Per-task cancellable context so CancelTask can unblock the goroutine taskCtx, taskCancel := context.WithCancel(ctx) diff --git a/internal/engine/task.go b/internal/engine/task.go index 09621e8..2685c6c 100644 --- a/internal/engine/task.go +++ b/internal/engine/task.go @@ -78,6 +78,12 @@ type Task struct { ClaimedAt time.Time StartedAt time.Time CompletedAt time.Time + + // onChange, when set, is called after every successful status Transition so + // the daemon can push the new state to the server immediately (event-driven + // uplink) instead of waiting for the next sync tick. Must be non-blocking — + // it's a coalescing TriggerSync. Set by the Manager at submit time. + onChange func() } // NewTaskFromAgent creates a Task from a server-claimed agent.Task. @@ -111,13 +117,15 @@ func NewTaskFromAgent(at agent.Task) *Task { } } -// Transition validates and performs a state transition. +// Transition validates and performs a state transition. On success it invokes +// the onChange hook (outside the lock) so the daemon can push the new state to +// the server immediately rather than waiting for the next sync tick. func (t *Task) Transition(to TaskStatus) error { t.mu.Lock() - defer t.mu.Unlock() allowed, ok := validTransitions[t.Status] if !ok { + t.mu.Unlock() return fmt.Errorf("no transitions from %s", t.Status) } for _, a := range allowed { @@ -129,12 +137,28 @@ func (t *Task) Transition(to TaskStatus) error { if to == StatusCompleted || to == StatusFailed { t.CompletedAt = time.Now() } + cb := t.onChange + t.mu.Unlock() + // Fire the event-driven uplink AFTER releasing the lock so a future + // heavier hook can't deadlock on the task mutex. + if cb != nil { + cb() + } return nil } } + t.mu.Unlock() return fmt.Errorf("invalid transition: %s -> %s", t.Status, to) } +// SetOnChange wires the post-transition hook. Call before the task starts +// transitioning (the Manager sets it at submit time). +func (t *Task) SetOnChange(fn func()) { + t.mu.Lock() + t.onChange = fn + t.mu.Unlock() +} + // GetStatus returns current status thread-safely. func (t *Task) GetStatus() TaskStatus { t.mu.RLock() diff --git a/internal/engine/task_test.go b/internal/engine/task_test.go index e9f6ccc..e99e339 100644 --- a/internal/engine/task_test.go +++ b/internal/engine/task_test.go @@ -216,3 +216,41 @@ func TestHasUntried(t *testing.T) { t.Error("all methods tried") } } + +func TestTransitionFiresOnChange(t *testing.T) { + task := NewTaskFromAgent(agent.Task{ID: "t1"}) // StatusClaimed + + var fired int + task.SetOnChange(func() { fired++ }) + + // Valid transition fires the hook. + if err := task.Transition(StatusResolving); err != nil { + t.Fatalf("Transition: %v", err) + } + if fired != 1 { + t.Errorf("onChange fired %d times, want 1 after a valid transition", fired) + } + + // Another valid transition fires again (event-driven, every transition). + if err := task.Transition(StatusDownloading); err != nil { + t.Fatalf("Transition: %v", err) + } + if fired != 2 { + t.Errorf("onChange fired %d times, want 2", fired) + } + + // Invalid transition must NOT fire the hook. + if err := task.Transition(StatusClaimed); err == nil { + t.Error("expected error on invalid transition downloading→claimed") + } + if fired != 2 { + t.Errorf("onChange fired %d times, want still 2 (no fire on invalid transition)", fired) + } +} + +func TestTransitionNilOnChangeNoPanic(t *testing.T) { + task := NewTaskFromAgent(agent.Task{ID: "t2"}) // no onChange set + if err := task.Transition(StatusResolving); err != nil { + t.Fatalf("Transition with nil onChange must not error: %v", err) + } +}