feat(agent): auto-resume interrupted downloads after a daemon restart

A daemon restart used to abandon in-flight downloads: the in-memory queue was
lost and the web doesn't re-dispatch a stuck task, so the user had to retry
manually. The bytes already persisted (mmap + anacrolix's piece-completion DB
keyed by info_hash; debrid via Range; usenet via its tracker) — the daemon just
didn't re-attempt the work.

ActiveTaskStore persists each in-flight download's agent.Task payload to
active-tasks.json; the daemon re-submits them on startup so the downloaders
resume the partial data. manager.Submit now dedups (the startup re-submit and a
later web re-dispatch can't both run), and recordFinished removes a task from
the store only on a genuine terminal — shuttingDown (set before Shutdown cancels
the task contexts) keeps shutdown-interrupted tasks so they resume next start.
Stream/seed/upgrade tasks aren't persisted; ForceStart is cleared on resume.
This commit is contained in:
Deivid Soto 2026-05-31 22:44:05 +02:00
parent b708bb8ab2
commit 445da233c0
6 changed files with 399 additions and 9 deletions

View file

@ -0,0 +1,105 @@
package agent
import (
"encoding/json"
"os"
"path/filepath"
"sync"
"github.com/torrentclaw/unarr/internal/config"
)
// activeTasksFilePathFn is overridable for testing.
var activeTasksFilePathFn = func() string {
return filepath.Join(config.DataDir(), "active-tasks.json")
}
// ActiveTaskStore persists the dispatch payloads (agent.Task) of in-flight
// DOWNLOAD tasks so the daemon can re-submit them after a restart and have the
// downloaders resume the partial data — torrent via the persisted
// piece-completion DB, debrid via HTTP Range, usenet via its segment tracker.
//
// Distinct from LocalState (tasks.json), which holds transient status/progress
// for syncing to the web; this holds the re-dispatch payload needed to restart
// the work. An entry is added when a download starts and removed when it
// reaches a genuine terminal state (completed / failed / cancelled) — but NOT
// when the daemon is shutting down, so an interrupted download survives the
// restart and resumes.
type ActiveTaskStore struct {
mu sync.Mutex
tasks map[string]Task
}
// NewActiveTaskStore creates an empty store. Call Load() to hydrate it from disk.
func NewActiveTaskStore() *ActiveTaskStore {
return &ActiveTaskStore{tasks: make(map[string]Task)}
}
// Add records (or replaces) a task and persists the set.
func (s *ActiveTaskStore) Add(t Task) {
s.mu.Lock()
defer s.mu.Unlock()
s.tasks[t.ID] = t
s.flushLocked()
}
// Remove drops a task and persists the set. No-op if absent.
func (s *ActiveTaskStore) Remove(taskID string) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.tasks[taskID]; !ok {
return
}
delete(s.tasks, taskID)
s.flushLocked()
}
// Load reads the persisted tasks from disk into the store and returns them.
// Returns nil on a missing or unreadable file (a fresh daemon has nothing to
// resume). Safe to call once at startup before any Add/Remove.
func (s *ActiveTaskStore) Load() []Task {
data, err := os.ReadFile(activeTasksFilePathFn())
if err != nil {
return nil
}
var tasks []Task
if json.Unmarshal(data, &tasks) != nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.tasks = make(map[string]Task, len(tasks))
for _, t := range tasks {
if t.ID != "" {
s.tasks[t.ID] = t
}
}
out := make([]Task, 0, len(s.tasks))
for _, t := range s.tasks {
out = append(out, t)
}
return out
}
// flushLocked atomically writes the current set to disk. Caller holds s.mu.
// Best-effort: a write failure is non-fatal (the in-memory set stays correct;
// at worst a crash before the next flush loses one resume entry).
func (s *ActiveTaskStore) flushLocked() {
tasks := make([]Task, 0, len(s.tasks))
for _, t := range s.tasks {
tasks = append(tasks, t)
}
data, err := json.MarshalIndent(tasks, "", " ")
if err != nil {
return
}
path := activeTasksFilePathFn()
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return
}
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, 0o644); err != nil {
return
}
_ = os.Rename(tmp, path)
}

View file

@ -0,0 +1,75 @@
package agent
import (
"path/filepath"
"testing"
)
// withTempStorePath points the store file at a temp location for the duration
// of a test and restores the original afterward.
func withTempStorePath(t *testing.T) {
t.Helper()
orig := activeTasksFilePathFn
path := filepath.Join(t.TempDir(), "active-tasks.json")
activeTasksFilePathFn = func() string { return path }
t.Cleanup(func() { activeTasksFilePathFn = orig })
}
func TestActiveTaskStore_AddLoadRoundTrip(t *testing.T) {
withTempStorePath(t)
s := NewActiveTaskStore()
s.Add(Task{ID: "a", InfoHash: "hashA", Title: "Movie A", Mode: "download"})
s.Add(Task{ID: "b", NzbID: "nzbB", Title: "Show B"})
// A fresh store hydrated from disk must see both.
loaded := NewActiveTaskStore().Load()
if len(loaded) != 2 {
t.Fatalf("Load returned %d tasks, want 2", len(loaded))
}
byID := map[string]Task{}
for _, tk := range loaded {
byID[tk.ID] = tk
}
if byID["a"].InfoHash != "hashA" || byID["a"].Title != "Movie A" {
t.Errorf("task a not round-tripped: %+v", byID["a"])
}
if byID["b"].NzbID != "nzbB" {
t.Errorf("task b not round-tripped: %+v", byID["b"])
}
}
func TestActiveTaskStore_Remove(t *testing.T) {
withTempStorePath(t)
s := NewActiveTaskStore()
s.Add(Task{ID: "a", Title: "A"})
s.Add(Task{ID: "b", Title: "B"})
s.Remove("a")
s.Remove("missing") // no-op
loaded := NewActiveTaskStore().Load()
if len(loaded) != 1 || loaded[0].ID != "b" {
t.Fatalf("after Remove(a), Load = %+v, want only b", loaded)
}
}
func TestActiveTaskStore_Overwrite(t *testing.T) {
withTempStorePath(t)
s := NewActiveTaskStore()
s.Add(Task{ID: "a", Title: "old"})
s.Add(Task{ID: "a", Title: "new"}) // same id replaces
loaded := NewActiveTaskStore().Load()
if len(loaded) != 1 || loaded[0].Title != "new" {
t.Fatalf("overwrite failed: %+v", loaded)
}
}
func TestActiveTaskStore_LoadMissingFile(t *testing.T) {
withTempStorePath(t) // temp dir, no file written yet
if got := NewActiveTaskStore().Load(); got != nil {
t.Errorf("Load on missing file = %+v, want nil", got)
}
}

View file

@ -316,6 +316,11 @@ func runDaemonStart() error {
},
}, reporter, torrentDl, debridDl, usenetDl)
// Resume store: persist in-flight downloads so a daemon restart can re-submit
// them (the downloaders resume the partial data). Wire it before any Submit.
taskStore := agent.NewActiveTaskStore()
manager.SetTaskStore(taskStore)
// Create persistent stream server
streamSrv := engine.NewStreamServer(cfg.Download.StreamPort)
streamSrv.SetUPnPEnabled(cfg.Download.EnableUPnP)
@ -426,6 +431,20 @@ func runDaemonStart() error {
}
}
// Resume downloads interrupted by the previous shutdown/crash. Re-submit
// each persisted task; its downloader picks up the partial data (torrent via
// the piece-completion DB, debrid via Range, usenet via its tracker). Done
// before the sync loop starts; a later web re-dispatch of the same id is
// deduped by the manager.
if resume := taskStore.Load(); len(resume) > 0 {
log.Printf("[resume] re-submitting %d interrupted download(s)", len(resume))
for _, t := range resume {
t.ForceStart = false // respect MaxConcurrent on bulk auto-resume
log.Printf("[resume] %s — %s", agent.ShortID(t.ID), t.Title)
manager.Submit(ctx, t)
}
}
// Wire: sync receives control signals → act on manager
d.OnControlAction = func(action, taskID string, deleteFiles bool) {
switch action {
@ -847,13 +866,16 @@ func runDaemonStart() error {
cancelStreamContexts()
cancelAllPlayerSessions()
streamSrv.Shutdown(context.Background())
cancel()
// Give active downloads 30s to finish
// Drain active downloads BEFORE cancelling the daemon context. Shutdown
// sets shuttingDown + cancels each task context itself, so interrupted
// downloads keep their resume-store entry. Cancelling the shared ctx first
// would make them look like genuine failures and wipe the entry → no resume.
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
manager.Shutdown(shutdownCtx)
cancel()
d.Deregister()
fmt.Println(" Daemon stopped.")
return nil

View file

@ -4,6 +4,7 @@ import (
"context"
"log"
"sync"
"sync/atomic"
"github.com/torrentclaw/unarr/internal/agent"
)
@ -37,8 +38,26 @@ type Manager struct {
// The sync goroutine reads and clears this to include final states in the next sync.
recentMu sync.Mutex
recentFinished []agent.TaskState
// taskStore persists in-flight download payloads so the daemon can re-submit
// them after a restart (the downloaders resume the partial data). nil = no
// persistence. shuttingDown gates removal: a task interrupted by a graceful
// shutdown keeps its store entry (so it resumes), unlike a genuine terminal.
taskStore taskPersister
shuttingDown atomic.Bool
}
// taskPersister is the resume store the manager records in-flight downloads to.
// Satisfied by *agent.ActiveTaskStore; an interface so tests can inject a fake.
type taskPersister interface {
Add(agent.Task)
Remove(taskID string)
}
// SetTaskStore wires the resume store. Call once before Submit. Optional —
// without it, downloads are not persisted for cross-restart resume.
func (m *Manager) SetTaskStore(s taskPersister) { m.taskStore = s }
// NewManager creates a download manager.
func NewManager(cfg ManagerConfig, reporter *ProgressReporter, downloaders ...Downloader) *Manager {
if cfg.MaxConcurrent <= 0 {
@ -68,10 +87,28 @@ func (m *Manager) Submit(ctx context.Context, at agent.Task) {
taskCtx, taskCancel := context.WithCancel(ctx)
m.activeMu.Lock()
// Dedup: a task can arrive twice — once when the daemon re-submits it from
// the resume store on startup, and again when the web re-dispatches it. The
// second arrival must NOT launch a parallel goroutine for the same files.
if _, exists := m.active[task.ID]; exists {
m.activeMu.Unlock()
taskCancel()
log.Printf("[%s] already active — ignoring duplicate submit", agent.ShortID(task.ID))
return
}
m.active[task.ID] = task
m.cancels[task.ID] = taskCancel
m.activeMu.Unlock()
// Persist real downloads so a daemon restart can resume them (torrent via
// the piece-completion DB, debrid via Range, usenet via its tracker). Stream
// and seed-file tasks are transient — not resumed. Upgrade downloads
// (ReplacePath set) are excluded too: re-running one after an interrupted
// organize could double-download or replace the wrong target.
if m.taskStore != nil && (at.Mode == "" || at.Mode == "download") && at.ReplacePath == "" {
m.taskStore.Add(at)
}
m.reporter.Track(task)
// Force start: bypass semaphore (like Transmission's "Force Start")
@ -176,6 +213,13 @@ func (m *Manager) TaskStates() []agent.TaskState {
// recordFinished stores a completed/failed task for the next sync cycle.
func (m *Manager) recordFinished(update agent.StatusUpdate) {
// Drop from the resume store on a genuine terminal state (completed / failed
// / user-cancelled). A shutdown-interrupted task is NOT removed — it stays so
// the daemon re-submits and resumes it on the next start.
if m.taskStore != nil && !m.shuttingDown.Load() {
m.taskStore.Remove(update.TaskID)
}
m.recentMu.Lock()
defer m.recentMu.Unlock()
m.recentFinished = append(m.recentFinished, agent.TaskStateFromUpdate(update))
@ -271,6 +315,23 @@ func (m *Manager) Wait() {
// Shutdown stops accepting tasks and waits for active downloads to finish.
func (m *Manager) Shutdown(ctx context.Context) {
// Flag shutdown BEFORE cancelling task contexts: tasks interrupted by the
// shutdown then keep their resume-store entry (recordFinished skips the
// removal) so the daemon re-submits and resumes them on the next start.
m.shuttingDown.Store(true)
// Cancel every task context NOW (before waiting). Downloads block on their
// context, so this is what actually unblocks them — and because shuttingDown
// is already set, their recordFinished keeps the resume entry. (Waiting first
// would just stall until the timeout, and relying on the daemon's outer ctx
// cancel would race ahead of shuttingDown and wipe the entries.)
m.activeMu.Lock()
for id, cancel := range m.cancels {
cancel()
delete(m.cancels, id)
}
m.activeMu.Unlock()
// Wait for goroutines with timeout
done := make(chan struct{})
go func() {
@ -281,7 +342,7 @@ func (m *Manager) Shutdown(ctx context.Context) {
select {
case <-done:
case <-ctx.Done():
log.Println("shutdown timeout, cancelling active downloads")
log.Println("shutdown timeout, abandoning active downloads")
}
// Shutdown all downloaders
@ -291,12 +352,7 @@ func (m *Manager) Shutdown(ctx context.Context) {
}
}
// Clean active map and cancel functions
m.activeMu.Lock()
for id, cancel := range m.cancels {
cancel()
delete(m.cancels, id)
}
m.active = make(map[string]*Task)
m.activeMu.Unlock()
}

View file

@ -0,0 +1,123 @@
package engine
import (
"context"
"sync"
"testing"
"time"
"github.com/torrentclaw/unarr/internal/agent"
)
// fakePersister is an in-memory taskPersister for asserting manager↔store calls
// without touching disk.
type fakePersister struct {
mu sync.Mutex
tasks map[string]bool
}
func newFakePersister() *fakePersister { return &fakePersister{tasks: map[string]bool{}} }
func (f *fakePersister) Add(t agent.Task) { f.mu.Lock(); f.tasks[t.ID] = true; f.mu.Unlock() }
func (f *fakePersister) Remove(id string) { f.mu.Lock(); delete(f.tasks, id); f.mu.Unlock() }
func (f *fakePersister) has(id string) bool { f.mu.Lock(); defer f.mu.Unlock(); return f.tasks[id] }
func newResumeManager(t *testing.T, p taskPersister) (*Manager, context.Context, context.CancelFunc) {
t.Helper()
reporter := NewProgressReporter(agent.NewClient("http://localhost", "test", "test"), time.Hour)
mgr := NewManager(
ManagerConfig{MaxConcurrent: 2, OutputDir: t.TempDir()},
reporter,
&slowMockDownloader{method: MethodTorrent},
)
mgr.SetTaskStore(p)
ctx, cancel := context.WithCancel(context.Background())
go reporter.Run(ctx)
return mgr, ctx, cancel
}
// dlTask builds a download task. IDs mirror production (UUID-length); the engine
// logs task.ID[:8] in several places, so sub-8-char ids would panic — not a real
// case since the web always sends UUIDs.
func dlTask(id string) agent.Task {
return agent.Task{
ID: "task-uuid-" + id, // ≥ 8 chars like a real dispatch id
InfoHash: "abc123def456abc123def456abc123def456abc1",
Title: "Resume " + id,
PreferredMethod: "torrent",
Mode: "download",
}
}
func TestManager_SubmitDedupes(t *testing.T) {
mgr, ctx, cancel := newResumeManager(t, newFakePersister())
defer cancel()
task := dlTask("dup-1")
mgr.Submit(ctx, task)
mgr.Submit(ctx, task) // duplicate id — must not launch a second download
if n := mgr.ActiveCount(); n != 1 {
t.Errorf("ActiveCount = %d after duplicate submit, want 1", n)
}
cancel()
mgr.Wait()
}
func TestManager_PersistsDownloadAndRemovesOnTerminal(t *testing.T) {
p := newFakePersister()
mgr, ctx, cancel := newResumeManager(t, p)
defer cancel()
task := dlTask("t1")
mgr.Submit(ctx, task)
if !p.has(task.ID) {
t.Fatal("download not persisted to the resume store on submit")
}
// A genuine terminal (user cancel, not shutdown) must remove it.
mgr.CancelTask(task.ID)
mgr.Wait()
if p.has(task.ID) {
t.Error("task still in resume store after a genuine terminal — should be removed")
}
}
func TestManager_KeepsStoreEntryOnShutdown(t *testing.T) {
p := newFakePersister()
mgr, ctx, cancel := newResumeManager(t, p)
defer cancel()
task := dlTask("s1")
mgr.Submit(ctx, task)
if !p.has(task.ID) {
t.Fatal("download not persisted on submit")
}
// Shutdown interrupts the in-flight download — the entry must SURVIVE so the
// daemon re-submits and resumes it next start.
// Shutdown cancels the task contexts itself then waits, so once it returns
// the interrupted task's recordFinished has run (and must have skipped the
// removal because shuttingDown is set) — no sleep/poll needed.
shutCtx, sc := context.WithTimeout(context.Background(), 5*time.Second)
defer sc()
mgr.Shutdown(shutCtx)
if !p.has(task.ID) {
t.Error("task removed from resume store on shutdown — it would not resume")
}
}
func TestManager_DoesNotPersistStreamTasks(t *testing.T) {
p := newFakePersister()
mgr, ctx, cancel := newResumeManager(t, p)
defer cancel()
task := dlTask("stream-1")
task.Mode = "stream"
mgr.Submit(ctx, task)
if p.has(task.ID) {
t.Error("stream task persisted to resume store — only downloads should be")
}
cancel()
mgr.Wait()
}