From 6adf1e2c4c20414f1de73acc51871d2149f9d274 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Tue, 5 May 2026 20:35:08 +0200 Subject: [PATCH 01/88] feat(mediaserver): Plex/Jellyfin/Emby auto-refresh + .strm instant mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sprint 1 — Auto-refresh after download: - New [[mediaserver]] TOML section with kind/url/token/sections - mediaserver.Refresh() fans out to Plex (partial via section ID auto-mapping from file path prefix) and Jellyfin/Emby (full library scan) - Manager.OnFinalized callback wired in daemon to trigger refresh after organize() completes — keeps engine package free of mediaserver dep - New unarr mediaserver {setup,list,remove,test} commands - unarr init wizard offers to configure refresh when a server is detected Sprint 2 — .strm instant mode (cloud + agent): - Mode strm-to-library handled in daemon dispatch: writes a one-line .strm file pointing to the cloud-resolved debrid HTTPS URL, then triggers refresh - engine.WriteStrm + StrmDestForTask mirror organize()'s naming so Plex/Jellyfin see the expected folder structure (Movies/Title (Year)/, TV Shows/Show/Season XX/) - Atomic write (temp + rename) so partial files never get indexed - Reports completed/failed status to the cloud via existing agent client --- internal/cmd/daemon.go | 25 +- internal/cmd/init.go | 28 +++ internal/cmd/mediaserver.go | 335 +++++++++++++++++++++++++++ internal/cmd/root.go | 4 + internal/cmd/strm_handler.go | 70 ++++++ internal/config/config.go | 18 +- internal/engine/manager.go | 9 + internal/engine/strm.go | 130 +++++++++++ internal/engine/task.go | 7 + internal/mediaserver/detect.go | 18 +- internal/mediaserver/detect_test.go | 8 +- internal/mediaserver/refresh.go | 280 ++++++++++++++++++++++ internal/mediaserver/refresh_test.go | 149 ++++++++++++ 13 files changed, 1065 insertions(+), 16 deletions(-) create mode 100644 internal/cmd/mediaserver.go create mode 100644 internal/cmd/strm_handler.go create mode 100644 internal/engine/strm.go create mode 100644 internal/mediaserver/refresh.go create mode 100644 internal/mediaserver/refresh_test.go diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index b8db356..a486bff 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -17,6 +17,7 @@ import ( "github.com/torrentclaw/unarr/internal/config" "github.com/torrentclaw/unarr/internal/engine" "github.com/torrentclaw/unarr/internal/library" + "github.com/torrentclaw/unarr/internal/mediaserver" "github.com/torrentclaw/unarr/internal/usenet/download" ) @@ -237,10 +238,28 @@ func runDaemonStart() error { // Trigger immediate sync when a download slot frees up manager.OnTaskDone = func() { d.TriggerSync() } + // Trigger Plex/Jellyfin/Emby library refresh after a task finalises so + // the new file appears in the user's library within seconds (instead + // of waiting for the next periodic scan). No-op if no servers + // configured. Errors are logged inside Refresh and never propagate. + if len(cfg.MediaServers) > 0 { + manager.OnFinalized = func(task *engine.Task) { + if task == nil { + return + } + fp := task.SafeFilePath() + if fp == "" { + return + } + mediaserver.Refresh(cfg.MediaServers, fp) + } + } + // Wire: sync receives new tasks → submit to manager or handle stream d.OnTasksClaimed = func(tasks []agent.Task) { for _, t := range tasks { - if t.Mode == "stream" { + switch t.Mode { + case "stream": if isStreamingTask(t.ID) { continue } @@ -251,7 +270,9 @@ func runDaemonStart() error { streamRegistry.cancels[t.ID] = streamCancel streamRegistry.mu.Unlock() go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv) - } else { + case "strm-to-library": + go handleStrmToLibrary(ctx, t, cfg, agentClient) + default: manager.Submit(ctx, t) } } diff --git a/internal/cmd/init.go b/internal/cmd/init.go index 9e7a8ca..668dc8b 100644 --- a/internal/cmd/init.go +++ b/internal/cmd/init.go @@ -296,6 +296,34 @@ func runInit(apiURLOverride string) error { } } + // ── Plex / Jellyfin / Emby refresh hook ──────────────────────── + // Offer to wire library refreshes if a media server was detected and + // none are configured yet. Skipping here is fine — the user can run + // `unarr mediaserver setup` later. + if len(detected.Servers) > 0 && len(cfg.MediaServers) == 0 { + fmt.Println() + var configureMS bool + err = huh.NewForm( + huh.NewGroup( + huh.NewConfirm(). + Title(fmt.Sprintf("Auto-refresh %s on every download?", detected.Servers[0].Name)). + Description("New downloads appear on Roku/Apple TV/etc. within seconds, instead of waiting for the next periodic library scan"). + Affirmative("Yes, configure"). + Negative("Skip (do it later with 'unarr mediaserver setup')"). + Value(&configureMS), + ), + ).Run() + if err == nil && configureMS { + fmt.Println() + if err := runMediaserverSetup(); err != nil { + color.New(color.FgYellow).Printf(" Media server setup failed: %s\n", err) + } else { + // runMediaserverSetup already saved + updated appCfg. + cfg = appCfg + } + } + } + // ── Debrid auto-detection from *arr ───────────────────────────── if resp.User.IsPro { diff --git a/internal/cmd/mediaserver.go b/internal/cmd/mediaserver.go new file mode 100644 index 0000000..a0a1a74 --- /dev/null +++ b/internal/cmd/mediaserver.go @@ -0,0 +1,335 @@ +package cmd + +import ( + "errors" + "fmt" + "strings" + + "github.com/charmbracelet/huh" + "github.com/fatih/color" + "github.com/spf13/cobra" + "github.com/torrentclaw/unarr/internal/config" + "github.com/torrentclaw/unarr/internal/mediaserver" +) + +func newMediaserverCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "mediaserver", + Aliases: []string{"plex", "jellyfin"}, + Short: "Configure Plex / Jellyfin / Emby auto-refresh", + Long: `Manage the list of media servers that unarr should refresh after a +download finishes. + +When configured, unarr triggers a partial library refresh on each server +right after a download is verified and organised, so the new file shows +up in your Plex / Jellyfin / Emby (and therefore on Roku, Apple TV, Fire +TV, etc.) within seconds instead of waiting for the next periodic scan.`, + } + + cmd.AddCommand( + newMediaserverSetupCmd(), + newMediaserverListCmd(), + newMediaserverRemoveCmd(), + newMediaserverTestCmd(), + ) + + return cmd +} + +func newMediaserverSetupCmd() *cobra.Command { + return &cobra.Command{ + Use: "setup", + Short: "Interactive wizard to add a Plex / Jellyfin / Emby server", + RunE: func(cmd *cobra.Command, args []string) error { + return runMediaserverSetup() + }, + } +} + +func newMediaserverListCmd() *cobra.Command { + return &cobra.Command{ + Use: "list", + Short: "List configured media servers", + RunE: func(cmd *cobra.Command, args []string) error { + cfg := loadConfig() + if len(cfg.MediaServers) == 0 { + fmt.Println("No media servers configured. Run 'unarr mediaserver setup' to add one.") + return nil + } + for i, s := range cfg.MediaServers { + fmt.Printf("%d. %s @ %s\n", i+1, strings.ToUpper(s.Kind), s.URL) + if len(s.Sections) > 0 { + fmt.Printf(" Sections: %v\n", s.Sections) + } + } + return nil + }, + } +} + +func newMediaserverRemoveCmd() *cobra.Command { + return &cobra.Command{ + Use: "remove", + Short: "Remove a configured media server", + RunE: func(cmd *cobra.Command, args []string) error { + cfg := loadConfig() + if len(cfg.MediaServers) == 0 { + fmt.Println("No media servers configured.") + return nil + } + + var options []huh.Option[int] + for i, s := range cfg.MediaServers { + label := fmt.Sprintf("%s @ %s", strings.ToUpper(s.Kind), s.URL) + options = append(options, huh.NewOption(label, i)) + } + + var idx int + err := huh.NewForm( + huh.NewGroup( + huh.NewSelect[int](). + Title("Which server to remove?"). + Options(options...). + Value(&idx), + ), + ).Run() + if err != nil { + if errors.Is(err, huh.ErrUserAborted) { + return nil + } + return err + } + + cfg.MediaServers = append(cfg.MediaServers[:idx], cfg.MediaServers[idx+1:]...) + if err := config.Save(cfg, cfgFile); err != nil { + return fmt.Errorf("save config: %w", err) + } + appCfg = cfg + color.New(color.FgGreen).Println(" ✓ Removed.") + return nil + }, + } +} + +func newMediaserverTestCmd() *cobra.Command { + return &cobra.Command{ + Use: "test", + Short: "Trigger a refresh on each configured server (sanity check)", + RunE: func(cmd *cobra.Command, args []string) error { + cfg := loadConfig() + if len(cfg.MediaServers) == 0 { + fmt.Println("No media servers configured.") + return nil + } + for _, s := range cfg.MediaServers { + fmt.Printf("Refreshing %s @ %s ... ", s.Kind, s.URL) + mediaserver.Refresh([]mediaserver.ServerConfig{s}, "") + } + // Refresh fans out goroutines; give them time to log results. + fmt.Println("dispatched (errors, if any, are logged).") + return nil + }, + } +} + +// runMediaserverSetup walks the user through adding a single media server. +// Auto-detects local Plex/Jellyfin/Emby via port scan and prefills as much +// as possible. +func runMediaserverSetup() error { + if !isTerminal() { + return fmt.Errorf("interactive mode requires a terminal") + } + + bold := color.New(color.Bold) + cyan := color.New(color.FgCyan) + dim := color.New(color.FgHiBlack) + green := color.New(color.FgGreen) + + cfg := loadConfig() + + fmt.Println() + bold.Println(" Add a media server") + fmt.Println() + dim.Println(" unarr will hit the server's refresh API after each download,") + dim.Println(" so new files appear in your library within seconds.") + fmt.Println() + + detected := mediaserver.Detect() + + // Pick kind + var kind string + kindOpts := []huh.Option[string]{ + huh.NewOption("Plex", "plex"), + huh.NewOption("Jellyfin", "jellyfin"), + huh.NewOption("Emby", "emby"), + } + // Default selection: first detected server's kind, lower-cased. + if len(detected.Servers) > 0 { + kind = strings.ToLower(detected.Servers[0].Name) + } else { + kind = "plex" + } + + if err := huh.NewForm(huh.NewGroup( + huh.NewSelect[string](). + Title("Server type"). + Options(kindOpts...). + Value(&kind), + )).Run(); err != nil { + if errors.Is(err, huh.ErrUserAborted) { + return nil + } + return err + } + + // Prefill URL from detection if available + url := "" + for _, s := range detected.Servers { + if strings.EqualFold(s.Name, kind) { + url = s.URL + break + } + } + if url == "" { + url = defaultURLFor(kind) + } + + if err := huh.NewForm(huh.NewGroup( + huh.NewInput(). + Title("Server URL"). + Description("Reachable from this machine — e.g. http://localhost:32400"). + Value(&url). + Validate(func(s string) error { + s = strings.TrimSpace(s) + if s == "" { + return fmt.Errorf("URL is required") + } + if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") { + return fmt.Errorf("must start with http:// or https://") + } + return nil + }), + )).Run(); err != nil { + if errors.Is(err, huh.ErrUserAborted) { + return nil + } + return err + } + url = strings.TrimRight(strings.TrimSpace(url), "/") + + // Token entry + token := "" + if kind == "plex" { + // Try local Preferences.xml first (works when Plex runs on same host). + if t := mediaserver.LocalPlexToken(); t != "" { + cyan.Println(" ✓ Found Plex token in local Preferences.xml") + fmt.Println() + useLocal := true + _ = huh.NewForm(huh.NewGroup( + huh.NewConfirm(). + Title("Use the auto-detected token?"). + Affirmative("Yes"). + Negative("No, paste a different one"). + Value(&useLocal), + )).Run() + if useLocal { + token = t + } + } + } + + if token == "" { + title := "API key" + desc := "" + switch kind { + case "plex": + title = "Plex token" + desc = "Get it via Plex web UI → any item → ⋯ → Get Info → View XML → copy ?X-Plex-Token=... from URL" + case "jellyfin": + title = "Jellyfin API key" + desc = "Dashboard → Advanced → API Keys → New API Key" + case "emby": + title = "Emby API key" + desc = "Server Dashboard → API Keys → New Application" + } + if err := huh.NewForm(huh.NewGroup( + huh.NewInput(). + Title(title). + Description(desc). + Value(&token). + Validate(func(s string) error { + if strings.TrimSpace(s) == "" { + return fmt.Errorf("token is required") + } + return nil + }), + )).Run(); err != nil { + if errors.Is(err, huh.ErrUserAborted) { + return nil + } + return err + } + } + token = strings.TrimSpace(token) + + // Save + newServer := mediaserver.ServerConfig{ + Kind: kind, + URL: url, + Token: token, + } + + // Replace if same kind+URL already present, else append + replaced := false + for i, existing := range cfg.MediaServers { + if strings.EqualFold(existing.Kind, kind) && existing.URL == url { + cfg.MediaServers[i] = newServer + replaced = true + break + } + } + if !replaced { + cfg.MediaServers = append(cfg.MediaServers, newServer) + } + + if err := config.Save(cfg, cfgFile); err != nil { + return fmt.Errorf("save config: %w", err) + } + appCfg = cfg + + fmt.Println() + if replaced { + green.Printf(" ✓ Updated %s @ %s\n", strings.ToUpper(kind), url) + } else { + green.Printf(" ✓ Added %s @ %s\n", strings.ToUpper(kind), url) + } + fmt.Println() + + // Sanity test + doTest := true + _ = huh.NewForm(huh.NewGroup( + huh.NewConfirm(). + Title("Trigger a test refresh now?"). + Affirmative("Yes"). + Negative("Skip"). + Value(&doTest), + )).Run() + if doTest { + mediaserver.Refresh([]mediaserver.ServerConfig{newServer}, "") + fmt.Println(" Refresh dispatched. If it failed, the error is in the logs.") + } + + return nil +} + +func defaultURLFor(kind string) string { + switch strings.ToLower(kind) { + case "plex": + return "http://localhost:32400" + case "jellyfin": + return "http://localhost:8096" + case "emby": + return "http://localhost:8920" + } + return "" +} diff --git a/internal/cmd/root.go b/internal/cmd/root.go index b9b3d65..39f5e92 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -117,6 +117,9 @@ Source: https://github.com/torrentclaw/unarr`, scanCmd := newScanCmd() scanCmd.GroupID = "search" + mediaserverCmd := newMediaserverCmd() + mediaserverCmd.GroupID = "start" + rootCmd.AddCommand( // Getting Started initCmd, @@ -146,6 +149,7 @@ Source: https://github.com/torrentclaw/unarr`, completionCmd, // Library scanCmd, + mediaserverCmd, // Alias: upgrade → self-update newUpgradeCmd(), ) diff --git a/internal/cmd/strm_handler.go b/internal/cmd/strm_handler.go new file mode 100644 index 0000000..926d703 --- /dev/null +++ b/internal/cmd/strm_handler.go @@ -0,0 +1,70 @@ +package cmd + +import ( + "context" + "log" + + "github.com/torrentclaw/unarr/internal/agent" + "github.com/torrentclaw/unarr/internal/config" + "github.com/torrentclaw/unarr/internal/engine" + "github.com/torrentclaw/unarr/internal/mediaserver" +) + +// handleStrmToLibrary processes a Mode="strm-to-library" task by writing a +// one-line .strm file to the user's media library and triggering a +// Plex/Jellyfin/Emby refresh. No actual download happens; the .strm points +// at the cloud-resolved debrid HTTPS URL, and the media server streams from +// there when the user presses play. +// +// Reports completion (or failure) back to the cloud via the agent client. +func handleStrmToLibrary(ctx context.Context, t agent.Task, cfg config.Config, agentClient *agent.Client) { + short := agent.ShortID(t.ID) + + if t.DirectURL == "" { + log.Printf("[%s] strm-to-library: missing directUrl from server", short) + reportStrmFailure(ctx, agentClient, t.ID, "missing directUrl") + return + } + + organizeCfg := engine.OrganizeConfig{ + Enabled: cfg.Organize.Enabled, + MoviesDir: cfg.Organize.MoviesDir, + TVShowsDir: cfg.Organize.TVShowsDir, + OutputDir: cfg.Download.Dir, + } + + finalPath, err := engine.WriteStrm(t, organizeCfg) + if err != nil { + log.Printf("[%s] strm-to-library write failed: %v", short, err) + reportStrmFailure(ctx, agentClient, t.ID, err.Error()) + return + } + + log.Printf("[%s] strm-to-library wrote %s", short, finalPath) + + // Trigger media-server refresh if any are configured. Errors are logged + // inside Refresh and never propagate — the .strm is on disk, so the + // next periodic scan would pick it up regardless. + if len(cfg.MediaServers) > 0 { + mediaserver.Refresh(cfg.MediaServers, finalPath) + } + + if _, reportErr := agentClient.ReportStatus(ctx, agent.StatusUpdate{ + TaskID: t.ID, + Status: "completed", + Progress: 100, + FilePath: finalPath, + }); reportErr != nil { + log.Printf("[%s] strm-to-library: status report failed: %v", short, reportErr) + } +} + +func reportStrmFailure(ctx context.Context, agentClient *agent.Client, taskID, msg string) { + if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ + TaskID: taskID, + Status: "failed", + ErrorMessage: msg, + }); err != nil { + log.Printf("[%s] strm failure report failed: %v", agent.ShortID(taskID), err) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 5c593d5..038e1e2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,18 +9,20 @@ import ( "strings" "github.com/BurntSushi/toml" + "github.com/torrentclaw/unarr/internal/mediaserver" ) // Config holds all persistent CLI configuration. type Config struct { - Auth AuthConfig `toml:"auth"` - Agent AgentConfig `toml:"agent"` - Download DownloadConfig `toml:"downloads"` - Organize OrganizeConfig `toml:"organize"` - Daemon DaemonConfig `toml:"daemon"` - Notifications NotificationsConfig `toml:"notifications"` - General GeneralConfig `toml:"general"` - Library LibraryConfig `toml:"library"` + Auth AuthConfig `toml:"auth"` + Agent AgentConfig `toml:"agent"` + Download DownloadConfig `toml:"downloads"` + Organize OrganizeConfig `toml:"organize"` + Daemon DaemonConfig `toml:"daemon"` + Notifications NotificationsConfig `toml:"notifications"` + General GeneralConfig `toml:"general"` + Library LibraryConfig `toml:"library"` + MediaServers []mediaserver.ServerConfig `toml:"mediaserver"` } type AuthConfig struct { diff --git a/internal/engine/manager.go b/internal/engine/manager.go index 2a07b6f..83e4b08 100644 --- a/internal/engine/manager.go +++ b/internal/engine/manager.go @@ -33,6 +33,12 @@ type Manager struct { // Used by the daemon to trigger an immediate sync. OnTaskDone func() + // OnFinalized is called after a task successfully transitions to + // completed (after verify + organize + optional upgrade replace). + // Used by the daemon to trigger media-server library refreshes. + // Not invoked on failure. + OnFinalized func(task *Task) + // recentlyFinished holds tasks that completed/failed since the last sync read. // The sync goroutine reads and clears this to include final states in the next sync. recentMu sync.Mutex @@ -444,6 +450,9 @@ func (m *Manager) finalize(ctx context.Context, task *Task, result *Result) { if m.cfg.Notifications { desktopNotify("Download complete", task.Title) } + if m.OnFinalized != nil { + m.OnFinalized(task) + } m.recordFinished(task.ToStatusUpdate()) m.reporter.ReportFinal(ctx, task) } diff --git a/internal/engine/strm.go b/internal/engine/strm.go new file mode 100644 index 0000000..0961fd9 --- /dev/null +++ b/internal/engine/strm.go @@ -0,0 +1,130 @@ +package engine + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/torrentclaw/unarr/internal/agent" +) + +// StrmDest holds the resolved location for a .strm file to be written. +type StrmDest struct { + Dir string // directory the .strm file lives in (created if missing) + FileName string // filename including the .strm extension + FullPath string // Dir + FileName, joined +} + +// StrmDestForTask computes where a .strm file should land for the given +// agent task, mirroring organize()'s naming so Plex/Jellyfin sees the same +// folder structure as a real download would have produced. +// +// Returns an error if cfg lacks the relevant library directory. +func StrmDestForTask(task agent.Task, cfg OrganizeConfig) (StrmDest, error) { + switch { + case task.ContentType == "show": + if cfg.TVShowsDir == "" { + return StrmDest{}, fmt.Errorf("strm: TVShowsDir not configured") + } + showName := task.ContentTitle + if showName == "" { + showName = cleanTitle(task.Title) + } + dir := filepath.Join(cfg.TVShowsDir, sanitizePath(showName)) + var fileName string + if task.Season != nil { + dir = filepath.Join(dir, fmt.Sprintf("Season %02d", *task.Season)) + if task.Episode != nil { + fileName = fmt.Sprintf("%s - S%02dE%02d.strm", + sanitizePath(showName), *task.Season, *task.Episode) + } + } + if fileName == "" { + // Missing season/episode metadata — fall back to a sanitised title. + fileName = sanitizePath(showName) + ".strm" + } + return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil + + case task.CollectionName != "" && cfg.MoviesDir != "": + movieName := task.ContentTitle + if movieName == "" { + movieName = cleanTitle(task.Title) + } + year := strYear(task) + base := sanitizePath(movieName) + if year != "" { + base = fmt.Sprintf("%s (%s)", base, year) + } + dir := filepath.Join(cfg.MoviesDir, sanitizePath(task.CollectionName), base) + fileName := base + ".strm" + return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil + + case task.ContentType == "movie": + if cfg.MoviesDir == "" { + return StrmDest{}, fmt.Errorf("strm: MoviesDir not configured") + } + movieName := task.ContentTitle + if movieName == "" { + movieName = cleanTitle(task.Title) + } + year := strYear(task) + base := sanitizePath(movieName) + if year != "" { + base = fmt.Sprintf("%s (%s)", base, year) + } + dir := filepath.Join(cfg.MoviesDir, base) + fileName := base + ".strm" + return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil + + default: + // No metadata at all — drop into MoviesDir under a sanitised title so + // at least the file lands somewhere a library scan might find it. + if cfg.MoviesDir == "" { + return StrmDest{}, fmt.Errorf("strm: no library dir configured for content without metadata") + } + base := sanitizePath(cleanTitle(task.Title)) + if base == "" { + base = "Unknown" + } + dir := filepath.Join(cfg.MoviesDir, base) + fileName := base + ".strm" + return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil + } +} + +// WriteStrm writes a .strm file containing the given URL at the destination +// computed from the task. Creates parent dirs as needed. Atomic write +// (temp + rename) so a partial file never gets indexed. +func WriteStrm(task agent.Task, cfg OrganizeConfig) (string, error) { + if task.DirectURL == "" { + return "", fmt.Errorf("strm: task has no directUrl") + } + + dest, err := StrmDestForTask(task, cfg) + if err != nil { + return "", err + } + + if err := os.MkdirAll(dest.Dir, 0o755); err != nil { + return "", fmt.Errorf("strm: create dir: %w", err) + } + + tmp := dest.FullPath + ".tmp" + if err := os.WriteFile(tmp, []byte(strings.TrimSpace(task.DirectURL)+"\n"), 0o644); err != nil { + return "", fmt.Errorf("strm: write temp: %w", err) + } + if err := os.Rename(tmp, dest.FullPath); err != nil { + _ = os.Remove(tmp) + return "", fmt.Errorf("strm: rename: %w", err) + } + return dest.FullPath, nil +} + +// strYear is the agent.Task counterpart to organize.go's resolveYear. +func strYear(task agent.Task) string { + if task.ContentYear != nil && *task.ContentYear > 0 { + return fmt.Sprintf("%d", *task.ContentYear) + } + return yearRegex.FindString(task.Title) +} diff --git a/internal/engine/task.go b/internal/engine/task.go index ceba6c9..36d3249 100644 --- a/internal/engine/task.go +++ b/internal/engine/task.go @@ -155,6 +155,13 @@ func (t *Task) GetStreamURL() string { return t.StreamURL } +// SafeFilePath returns the task's final file path thread-safely. +func (t *Task) SafeFilePath() string { + t.mu.RLock() + defer t.mu.RUnlock() + return t.FilePath +} + // UpdateProgress updates download metrics thread-safely. func (t *Task) UpdateProgress(p Progress) { t.mu.Lock() diff --git a/internal/mediaserver/detect.go b/internal/mediaserver/detect.go index e0b3030..039c363 100644 --- a/internal/mediaserver/detect.go +++ b/internal/mediaserver/detect.go @@ -87,6 +87,17 @@ func Detect() DetectedPaths { // ── Plex ──────────────────────────────────────────────────────────── +// LocalPlexToken returns the Plex auth token from the local Plex config +// directory, if Plex Media Server is installed on this host. Returns "" +// when Plex isn't installed or the token can't be read. +func LocalPlexToken() string { + dir := plexConfigDir() + if dir == "" { + return "" + } + return PlexTokenFromPrefs(filepath.Join(dir, "Preferences.xml")) +} + func plexLibraryPaths() []string { configDir := plexConfigDir() if configDir == "" { @@ -95,7 +106,7 @@ func plexLibraryPaths() []string { // Read token from Preferences.xml prefsPath := filepath.Join(configDir, "Preferences.xml") - token := plexTokenFromPrefs(prefsPath) + token := PlexTokenFromPrefs(prefsPath) if token == "" { return nil } @@ -154,7 +165,10 @@ type plexPrefs struct { PlexOnlineToken string `xml:"PlexOnlineToken,attr"` } -func plexTokenFromPrefs(path string) string { +// PlexTokenFromPrefs reads the Plex auth token from a Preferences.xml file. +// Returns "" if the file can't be read or parsed. Used by the setup wizard +// when configuring a Plex server running on the same host as unarr. +func PlexTokenFromPrefs(path string) string { data, err := os.ReadFile(path) if err != nil { return "" diff --git a/internal/mediaserver/detect_test.go b/internal/mediaserver/detect_test.go index 19ba53c..9887093 100644 --- a/internal/mediaserver/detect_test.go +++ b/internal/mediaserver/detect_test.go @@ -79,7 +79,7 @@ func TestPlexTokenFromPrefs(t *testing.T) { ` os.WriteFile(prefsPath, []byte(xml), 0o644) - token := plexTokenFromPrefs(prefsPath) + token := PlexTokenFromPrefs(prefsPath) if token != "my-secret-token" { t.Errorf("token = %q, want my-secret-token", token) } @@ -91,14 +91,14 @@ func TestPlexTokenFromPrefs(t *testing.T) { xml := `` os.WriteFile(prefsPath, []byte(xml), 0o644) - token := plexTokenFromPrefs(prefsPath) + token := PlexTokenFromPrefs(prefsPath) if token != "" { t.Errorf("token = %q, want empty", token) } }) t.Run("file not found", func(t *testing.T) { - token := plexTokenFromPrefs("/nonexistent/Preferences.xml") + token := PlexTokenFromPrefs("/nonexistent/Preferences.xml") if token != "" { t.Errorf("token = %q, want empty", token) } @@ -109,7 +109,7 @@ func TestPlexTokenFromPrefs(t *testing.T) { prefsPath := filepath.Join(dir, "Preferences.xml") os.WriteFile(prefsPath, []byte("not xml at all"), 0o644) - token := plexTokenFromPrefs(prefsPath) + token := PlexTokenFromPrefs(prefsPath) if token != "" { t.Errorf("token = %q, want empty", token) } diff --git a/internal/mediaserver/refresh.go b/internal/mediaserver/refresh.go new file mode 100644 index 0000000..74c8637 --- /dev/null +++ b/internal/mediaserver/refresh.go @@ -0,0 +1,280 @@ +package mediaserver + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strings" + "sync" + "time" +) + +// ServerConfig describes a media server that should be refreshed after a +// download (or .strm write) finishes. Stored in unarr's config.toml under +// [[mediaserver]]. +type ServerConfig struct { + Kind string `toml:"kind"` // "plex" | "jellyfin" | "emby" + URL string `toml:"url"` // e.g. http://localhost:32400 + Token string `toml:"token"` // Plex token / Jellyfin or Emby API key + Sections []int `toml:"sections"` // optional: Plex section IDs to refresh (else auto) +} + +// Section describes a Plex library section. +type Section struct { + ID int + Title string + Locations []string +} + +// httpClient is the shared HTTP client for refresh calls. Short timeouts +// because a refresh trigger is fire-and-forget. +var httpClient = &http.Client{Timeout: 8 * time.Second} + +// plexSectionCache caches section lookups per server URL+token, so we don't +// re-fetch sections on every download. +var ( + plexSectionMu sync.RWMutex + plexSectionCache = map[string][]Section{} +) + +// Refresh fans out a refresh call to every configured media server. Errors +// are logged but never returned — a failed refresh is non-fatal because the +// download itself succeeded and the next periodic scan will pick it up. +// +// `filePath` is the path of the file that was just placed (or the .strm +// pointer); it's used to resolve the matching Plex section for partial +// refreshes. Pass "" to fall back to a full refresh. +// +// Each refresh runs in its own goroutine with a 15s timeout — the call +// returns immediately and never blocks the caller. +func Refresh(servers []ServerConfig, filePath string) { + for _, s := range servers { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := refreshOne(ctx, s, filePath); err != nil { + log.Printf("mediaserver: %s refresh failed (%s): %v", s.Kind, s.URL, err) + } + }() + } +} + +func refreshOne(ctx context.Context, s ServerConfig, filePath string) error { + switch strings.ToLower(s.Kind) { + case "plex": + return refreshPlex(ctx, s, filePath) + case "jellyfin", "emby": + return refreshJellyfin(ctx, s) + default: + return fmt.Errorf("unknown kind %q", s.Kind) + } +} + +// ── Plex ──────────────────────────────────────────────────────────── + +func refreshPlex(ctx context.Context, s ServerConfig, filePath string) error { + if s.URL == "" || s.Token == "" { + return fmt.Errorf("plex: missing url or token") + } + + sectionIDs := s.Sections + if len(sectionIDs) == 0 { + // Auto-resolve: fetch sections, pick whichever owns the file path. + sections, err := plexSections(ctx, s.URL, s.Token) + if err != nil { + return fmt.Errorf("fetch sections: %w", err) + } + if filePath != "" { + if id, ok := matchSectionByPath(sections, filePath); ok { + sectionIDs = []int{id} + } + } + if len(sectionIDs) == 0 { + // Fall back to refreshing every section. + for _, sec := range sections { + sectionIDs = append(sectionIDs, sec.ID) + } + } + } + + var firstErr error + for _, id := range sectionIDs { + if err := plexRefreshSection(ctx, s.URL, s.Token, id, filePath); err != nil { + if firstErr == nil { + firstErr = err + } + log.Printf("mediaserver: plex section %d refresh failed: %v", id, err) + } + } + return firstErr +} + +func plexRefreshSection(ctx context.Context, baseURL, token string, sectionID int, filePath string) error { + q := url.Values{} + if filePath != "" { + q.Set("path", filePath) + } + q.Set("X-Plex-Token", token) + + endpoint := fmt.Sprintf("%s/library/sections/%d/refresh?%s", + strings.TrimRight(baseURL, "/"), sectionID, q.Encode()) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) + if err != nil { + return err + } + req.Header.Set("Accept", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + return fmt.Errorf("status %d", resp.StatusCode) +} + +// plexSections returns the cached or freshly fetched library sections for a +// Plex server. The cache lives for the agent's lifetime — Plex sections +// rarely change, so refetching on every download is wasteful. +func plexSections(ctx context.Context, baseURL, token string) ([]Section, error) { + key := baseURL + "|" + token + plexSectionMu.RLock() + cached, ok := plexSectionCache[key] + plexSectionMu.RUnlock() + if ok { + return cached, nil + } + + sections, err := fetchPlexSections(ctx, baseURL, token) + if err != nil { + return nil, err + } + + plexSectionMu.Lock() + plexSectionCache[key] = sections + plexSectionMu.Unlock() + + return sections, nil +} + +func fetchPlexSections(ctx context.Context, baseURL, token string) ([]Section, error) { + endpoint := strings.TrimRight(baseURL, "/") + "/library/sections" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) + if err != nil { + return nil, err + } + req.Header.Set("X-Plex-Token", token) + req.Header.Set("Accept", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("status %d", resp.StatusCode) + } + + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + if err != nil { + return nil, err + } + + return parsePlexSectionsFull(body) +} + +// parsePlexSectionsFull parses the full sections response with IDs. +func parsePlexSectionsFull(body []byte) ([]Section, error) { + var container struct { + MediaContainer struct { + Directory []struct { + Key string `json:"key"` + Title string `json:"title"` + Location []struct { + Path string `json:"path"` + } `json:"Location"` + } `json:"Directory"` + } `json:"MediaContainer"` + } + if err := json.Unmarshal(body, &container); err != nil { + return nil, err + } + + var out []Section + for _, d := range container.MediaContainer.Directory { + var id int + _, _ = fmt.Sscanf(d.Key, "%d", &id) + if id == 0 { + continue + } + sec := Section{ID: id, Title: d.Title} + for _, loc := range d.Location { + if loc.Path != "" { + sec.Locations = append(sec.Locations, loc.Path) + } + } + out = append(out, sec) + } + return out, nil +} + +// matchSectionByPath returns the ID of the section whose Locations contain +// (as prefix) the given file path. Picks the most specific (longest) match. +func matchSectionByPath(sections []Section, filePath string) (int, bool) { + bestID := 0 + bestLen := 0 + for _, s := range sections { + for _, loc := range s.Locations { + loc = strings.TrimRight(loc, "/") + if loc == "" { + continue + } + if filePath == loc || strings.HasPrefix(filePath, loc+"/") { + if len(loc) > bestLen { + bestLen = len(loc) + bestID = s.ID + } + } + } + } + return bestID, bestID != 0 +} + +// ── Jellyfin / Emby ───────────────────────────────────────────────── + +func refreshJellyfin(ctx context.Context, s ServerConfig) error { + if s.URL == "" || s.Token == "" { + return fmt.Errorf("%s: missing url or token", s.Kind) + } + + endpoint := strings.TrimRight(s.URL, "/") + "/Library/Refresh" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, nil) + if err != nil { + return err + } + // Both Jellyfin and Emby accept this header. + req.Header.Set("X-Emby-Token", s.Token) + req.Header.Set("Accept", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + return fmt.Errorf("status %d", resp.StatusCode) +} diff --git a/internal/mediaserver/refresh_test.go b/internal/mediaserver/refresh_test.go new file mode 100644 index 0000000..08a8d42 --- /dev/null +++ b/internal/mediaserver/refresh_test.go @@ -0,0 +1,149 @@ +package mediaserver + +import ( + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" +) + +func TestParsePlexSectionsFull(t *testing.T) { + body := []byte(`{ + "MediaContainer": { + "Directory": [ + { "key": "1", "title": "Movies", "Location": [{"path": "/data/media/movies"}] }, + { "key": "2", "title": "TV Shows", "Location": [{"path": "/data/media/tv"}, {"path": "/mnt/tv2"}] }, + { "key": "0", "title": "Bogus", "Location": [{"path": "/skip"}] } + ] + } + }`) + + sections, err := parsePlexSectionsFull(body) + if err != nil { + t.Fatalf("parsePlexSectionsFull error: %v", err) + } + if len(sections) != 2 { + t.Fatalf("got %d sections, want 2 (id=0 should be skipped)", len(sections)) + } + if sections[0].ID != 1 || sections[0].Title != "Movies" { + t.Errorf("section[0] = %+v", sections[0]) + } + if sections[1].ID != 2 || len(sections[1].Locations) != 2 { + t.Errorf("section[1] = %+v", sections[1]) + } +} + +func TestMatchSectionByPath(t *testing.T) { + sections := []Section{ + {ID: 1, Title: "Movies", Locations: []string{"/data/media/movies"}}, + {ID: 2, Title: "TV", Locations: []string{"/data/media/tv"}}, + {ID: 3, Title: "TV-HD", Locations: []string{"/data/media/tv/hd"}}, + } + + tests := []struct { + path string + wantID int + wantOK bool + }{ + {"/data/media/movies/Inception (2010)/Inception.mkv", 1, true}, + {"/data/media/tv/Show/Season 01/ep.mkv", 2, true}, + {"/data/media/tv/hd/Show/Season 01/ep.mkv", 3, true}, // most specific wins + {"/data/media/movies", 1, true}, // exact + {"/elsewhere/foo.mkv", 0, false}, + } + for _, tc := range tests { + gotID, gotOK := matchSectionByPath(sections, tc.path) + if gotID != tc.wantID || gotOK != tc.wantOK { + t.Errorf("matchSectionByPath(%q) = (%d,%v), want (%d,%v)", + tc.path, gotID, gotOK, tc.wantID, tc.wantOK) + } + } +} + +func TestRefreshPlex_PartialRefreshWithPath(t *testing.T) { + var refreshHits int32 + var gotPath, gotToken string + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.URL.Path == "/library/sections": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "MediaContainer": { + "Directory": [ + { "key": "7", "title": "Movies", "Location": [{"path": "/m"}] } + ] + } + }`)) + case strings.HasPrefix(r.URL.Path, "/library/sections/7/refresh"): + atomic.AddInt32(&refreshHits, 1) + gotPath = r.URL.Query().Get("path") + gotToken = r.URL.Query().Get("X-Plex-Token") + w.WriteHeader(http.StatusOK) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer srv.Close() + + // Reset cache so the test server is hit fresh. + plexSectionMu.Lock() + plexSectionCache = map[string][]Section{} + plexSectionMu.Unlock() + + cfg := ServerConfig{Kind: "plex", URL: srv.URL, Token: "tk-1"} + Refresh([]ServerConfig{cfg}, "/m/Inception/Inception.mkv") + + // Refresh fans out goroutines — give it time. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if atomic.LoadInt32(&refreshHits) > 0 { + break + } + time.Sleep(20 * time.Millisecond) + } + + if atomic.LoadInt32(&refreshHits) != 1 { + t.Fatalf("refresh endpoint hit %d times, want 1", refreshHits) + } + if gotPath != "/m/Inception/Inception.mkv" { + t.Errorf("path query = %q", gotPath) + } + if gotToken != "tk-1" { + t.Errorf("token query = %q", gotToken) + } +} + +func TestRefreshJellyfin(t *testing.T) { + var hits int32 + var gotToken string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && r.URL.Path == "/Library/Refresh" { + atomic.AddInt32(&hits, 1) + gotToken = r.Header.Get("X-Emby-Token") + w.WriteHeader(http.StatusNoContent) + return + } + w.WriteHeader(http.StatusNotFound) + })) + defer srv.Close() + + cfg := ServerConfig{Kind: "jellyfin", URL: srv.URL, Token: "jf-key"} + Refresh([]ServerConfig{cfg}, "") + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if atomic.LoadInt32(&hits) > 0 { + break + } + time.Sleep(20 * time.Millisecond) + } + if atomic.LoadInt32(&hits) != 1 { + t.Fatalf("Jellyfin hits = %d, want 1", hits) + } + if gotToken != "jf-key" { + t.Errorf("X-Emby-Token = %q", gotToken) + } +} From f6117ddeb9e34bde9e015791e875d8d7014edb8e Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 6 May 2026 08:59:58 +0200 Subject: [PATCH 02/88] =?UTF-8?q?feat(torrent):=20act=20as=20WebTorrent=20?= =?UTF-8?q?peer=20for=20browser=20=E2=86=94=20unarr=20P2P=20streaming?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires anacrolix/torrent's built-in webtorrent package so a browser running webtorrent.js can fetch pieces from this CLI via WebRTC data channels. The daemon stays the seeder; we never relay bytes through TorrentClaw infrastructure — same legal posture as today. Changes: - internal/config: new [downloads.webrtc] section (enabled/trackers/stun_servers/turn_servers/turn_user/turn_pass). Disabled by default, opt-in via config.toml. When enabled but trackers / STUN slices are empty, defaults are reapplied on Load() so users get a working setup with a single `enabled = true`. - internal/engine: TorrentConfig gains WebRTCEnabled / WebRTCTrackers / ICEServers; NewTorrentDownloader populates ClientConfig.ICEServerList and forces NoUpload=false when WebRTC is on (browsers can't pull otherwise). buildMagnet now accepts variadic extra trackers and the downloader method prepends WSS trackers so anacrolix's webtorrent.TrackerClient picks them up first. - internal/engine/webrtc.go: BuildICEServers helper converts the TOML WebRTCConfig into []webrtc.ICEServer with shared TURN credentials. - internal/cmd/daemon.go + download.go: pass WebRTC config through to the engine. Tests (8 new, all green; full suite 0 lint issues, 0 vet): - buildMagnet free function: defaults-only, with extras, trim+empty-skip - downloader method: WebRTC disabled keeps WSS out, enabled prepends them - BuildICEServers: nil when disabled, STUN-only path, TURN+credentials - NewTorrentDownloader: full WebRTC-enabled construction (logs WebRTC peer enabled, magnet contains wss://tracker.torrentclaw.com) End-to-end smoke (browser ↔ unarr peer transfer) is deferred to a manual test once tracker.torrentclaw.com WSS is live. --- internal/cmd/daemon.go | 3 + internal/cmd/download.go | 3 + internal/config/config.go | 52 ++++++++-- internal/engine/torrent.go | 52 +++++++++- internal/engine/webrtc.go | 36 +++++++ internal/engine/webrtc_test.go | 177 +++++++++++++++++++++++++++++++++ 6 files changed, 310 insertions(+), 13 deletions(-) create mode 100644 internal/engine/webrtc.go create mode 100644 internal/engine/webrtc_test.go diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index b8db356..46059fd 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -189,6 +189,9 @@ func runDaemonStart() error { MaxUploadRate: maxUl, ListenPort: cfg.Download.ListenPort, SeedEnabled: false, + WebRTCEnabled: cfg.Download.WebRTC.Enabled, + WebRTCTrackers: cfg.Download.WebRTC.Trackers, + ICEServers: engine.BuildICEServers(cfg.Download.WebRTC), }) if err != nil { return fmt.Errorf("create torrent downloader: %w", err) diff --git a/internal/cmd/download.go b/internal/cmd/download.go index bd5ceab..5189166 100644 --- a/internal/cmd/download.go +++ b/internal/cmd/download.go @@ -114,6 +114,9 @@ func runDownloadWithDeps(input, method string, deps downloadDeps) error { StallTimeout: 10 * time.Minute, MaxTimeout: 0, // unlimited SeedEnabled: false, + WebRTCEnabled: cfg.Download.WebRTC.Enabled, + WebRTCTrackers: cfg.Download.WebRTC.Trackers, + ICEServers: engine.BuildICEServers(cfg.Download.WebRTC), }) if err != nil { return fmt.Errorf("create downloader: %w", err) diff --git a/internal/config/config.go b/internal/config/config.go index 5c593d5..cb53280 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -34,16 +34,30 @@ type AgentConfig struct { } type DownloadConfig struct { - Dir string `toml:"dir"` - PreferredMethod string `toml:"preferred_method"` - PreferredQuality string `toml:"preferred_quality"` // "2160p", "1080p", "720p" — hint for auto-selection - MaxConcurrent int `toml:"max_concurrent"` - MaxDownloadSpeed string `toml:"max_download_speed"` // e.g. "10MB", "500KB", "0" = unlimited - MaxUploadSpeed string `toml:"max_upload_speed"` // e.g. "1MB", "0" = unlimited - MetadataTimeout string `toml:"metadata_timeout"` // e.g. "1h", "30m", "0" = unlimited (default: "0") - StallTimeout string `toml:"stall_timeout"` // e.g. "30m", "1h", "0" = unlimited (default: "30m") - ListenPort int `toml:"listen_port"` // fixed port for incoming peer connections (default: 42069, 0 = random) - StreamPort int `toml:"stream_port"` // fixed port for streaming HTTP server (default: 11818) + Dir string `toml:"dir"` + PreferredMethod string `toml:"preferred_method"` + PreferredQuality string `toml:"preferred_quality"` // "2160p", "1080p", "720p" — hint for auto-selection + MaxConcurrent int `toml:"max_concurrent"` + MaxDownloadSpeed string `toml:"max_download_speed"` // e.g. "10MB", "500KB", "0" = unlimited + MaxUploadSpeed string `toml:"max_upload_speed"` // e.g. "1MB", "0" = unlimited + MetadataTimeout string `toml:"metadata_timeout"` // e.g. "1h", "30m", "0" = unlimited (default: "0") + StallTimeout string `toml:"stall_timeout"` // e.g. "30m", "1h", "0" = unlimited (default: "30m") + ListenPort int `toml:"listen_port"` // fixed port for incoming peer connections (default: 42069, 0 = random) + StreamPort int `toml:"stream_port"` // fixed port for streaming HTTP server (default: 11818) + WebRTC WebRTCConfig `toml:"webrtc"` +} + +// WebRTCConfig opts the daemon into acting as a WebTorrent peer so browsers +// can fetch pieces via WebRTC data channels — required by the in-browser +// player on torrentclaw.com. Disabled by default; enabling implies upload +// is allowed for active torrents (browsers can't download otherwise). +type WebRTCConfig struct { + Enabled bool `toml:"enabled"` // master switch + Trackers []string `toml:"trackers"` // wss:// signaling trackers + STUNServers []string `toml:"stun_servers"` // stun:host:port + TURNServers []string `toml:"turn_servers"` // turn:host:port (no auth) — see TURNCredentials for authed + TURNUser string `toml:"turn_user"` // optional, applied to all TURNServers + TURNPass string `toml:"turn_pass"` // optional } type OrganizeConfig struct { @@ -86,6 +100,11 @@ func Default() Config { PreferredMethod: "auto", MaxConcurrent: 3, StreamPort: 11818, + WebRTC: WebRTCConfig{ + Enabled: false, + Trackers: []string{"wss://tracker.torrentclaw.com"}, + STUNServers: []string{"stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"}, + }, }, Organize: OrganizeConfig{ Enabled: true, @@ -144,6 +163,19 @@ func Load(path string) (Config, error) { if cfg.Download.StreamPort == 0 { cfg.Download.StreamPort = 11818 } + // Re-apply WebRTC defaults only when the user enabled WebRTC but didn't + // supply trackers/STUN — leave both empty if disabled to keep config diffs clean. + if cfg.Download.WebRTC.Enabled { + if len(cfg.Download.WebRTC.Trackers) == 0 { + cfg.Download.WebRTC.Trackers = []string{"wss://tracker.torrentclaw.com"} + } + if len(cfg.Download.WebRTC.STUNServers) == 0 { + cfg.Download.WebRTC.STUNServers = []string{ + "stun:stun.l.google.com:19302", + "stun:stun1.l.google.com:19302", + } + } + } return cfg, nil } diff --git a/internal/engine/torrent.go b/internal/engine/torrent.go index 9a916df..5b1d16d 100644 --- a/internal/engine/torrent.go +++ b/internal/engine/torrent.go @@ -16,6 +16,7 @@ import ( alog "github.com/anacrolix/log" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/storage" + "github.com/pion/webrtc/v4" "github.com/torrentclaw/unarr/internal/config" "golang.org/x/term" "golang.org/x/time/rate" @@ -70,6 +71,14 @@ type TorrentConfig struct { SeedEnabled bool SeedRatio float64 // target seed ratio (default 0, meaning seed until SeedTime) SeedTime time.Duration // min seed time after completion (default 0) + + // WebRTC peer (WebTorrent protocol) for browser ↔ unarr P2P streaming. + // When enabled, anacrolix/torrent's built-in webtorrent package handles + // the WSS signaling + WebRTC data channels. Implies upload allowed for + // every torrent in the client (browsers can't pull pieces otherwise). + WebRTCEnabled bool + WebRTCTrackers []string // wss://… signaling trackers added to every magnet + ICEServers []webrtc.ICEServer // STUN + TURN servers for NAT traversal } // TorrentDownloader downloads torrents via BitTorrent P2P. @@ -96,9 +105,27 @@ func NewTorrentDownloader(cfg TorrentConfig) (*TorrentDownloader, error) { tcfg := torrent.NewDefaultClientConfig() tcfg.DataDir = cfg.DataDir tcfg.Seed = cfg.SeedEnabled - tcfg.NoUpload = !cfg.SeedEnabled + // WebRTC peers (browsers) can only pull pieces from us if upload is + // enabled. We honour SeedEnabled for the long-tail seed-after-complete + // behaviour but unconditionally allow upload while WebRTC is on so an + // active download can still serve to a watching browser. + tcfg.NoUpload = !cfg.SeedEnabled && !cfg.WebRTCEnabled tcfg.Logger = alog.Default.FilterLevel(alog.Critical) + // WebRTC / WebTorrent peer: anacrolix auto-routes ws://+wss:// trackers + // to the bundled webtorrent.TrackerClient. We only need to populate the + // ICE server list so the SDP offers we send carry usable candidates. + if cfg.WebRTCEnabled { + tcfg.DisableWebtorrent = false + if len(cfg.ICEServers) > 0 { + tcfg.ICEServerList = cfg.ICEServers + } + log.Printf("[torrent] WebRTC peer enabled (trackers=%d ice_servers=%d)", + len(cfg.WebRTCTrackers), len(cfg.ICEServers)) + } else { + tcfg.DisableWebtorrent = true + } + // --- Performance optimizations --- // Storage: mmap instead of default file backend. @@ -235,7 +262,7 @@ func (d *TorrentDownloader) Available(_ context.Context, task *Task) (bool, erro } func (d *TorrentDownloader) Download(ctx context.Context, task *Task, outputDir string, progressCh chan<- Progress) (*Result, error) { - magnet := buildMagnet(task.InfoHash) + magnet := d.buildMagnet(task.InfoHash) t, err := d.client.AddMagnet(magnet) if err != nil { @@ -604,14 +631,33 @@ func (d *TorrentDownloader) selectFiles(t *torrent.Torrent, taskID string) (tota return totalBytes, fileName } -func buildMagnet(infoHash string) string { +// buildMagnet composes a magnet URI for the info hash. extraTrackers (e.g. +// wss://… for WebRTC peer signaling) are prepended so anacrolix's +// webtorrent.TrackerClient picks them up first; the static UDP list +// follows. Empty / whitespace entries in extraTrackers are skipped. +func buildMagnet(infoHash string, extraTrackers ...string) string { params := []string{"xt=urn:btih:" + infoHash} + for _, t := range extraTrackers { + t = strings.TrimSpace(t) + if t == "" { + continue + } + params = append(params, "tr="+url.QueryEscape(t)) + } for _, tracker := range defaultTrackers { params = append(params, "tr="+url.QueryEscape(tracker)) } return "magnet:?" + strings.Join(params, "&") } +// buildMagnet on the downloader injects its WebRTC trackers when enabled. +func (d *TorrentDownloader) buildMagnet(infoHash string) string { + if d != nil && d.cfg.WebRTCEnabled { + return buildMagnet(infoHash, d.cfg.WebRTCTrackers...) + } + return buildMagnet(infoHash) +} + func formatBytes(b int64) string { const unit = 1024 if b < unit { diff --git a/internal/engine/webrtc.go b/internal/engine/webrtc.go new file mode 100644 index 0000000..28a81a4 --- /dev/null +++ b/internal/engine/webrtc.go @@ -0,0 +1,36 @@ +package engine + +import ( + "github.com/pion/webrtc/v4" + "github.com/torrentclaw/unarr/internal/config" +) + +// BuildICEServers converts a config.WebRTCConfig into the +// []webrtc.ICEServer slice that anacrolix/torrent's webtorrent client +// needs. STUN entries become bare URLs; TURN entries inherit the shared +// TURNUser / TURNPass credentials. Returns nil when WebRTC is disabled. +func BuildICEServers(cfg config.WebRTCConfig) []webrtc.ICEServer { + if !cfg.Enabled { + return nil + } + var servers []webrtc.ICEServer + for _, s := range cfg.STUNServers { + if s == "" { + continue + } + servers = append(servers, webrtc.ICEServer{URLs: []string{s}}) + } + for _, t := range cfg.TURNServers { + if t == "" { + continue + } + entry := webrtc.ICEServer{URLs: []string{t}} + if cfg.TURNUser != "" { + entry.Username = cfg.TURNUser + entry.Credential = cfg.TURNPass + entry.CredentialType = webrtc.ICECredentialTypePassword + } + servers = append(servers, entry) + } + return servers +} diff --git a/internal/engine/webrtc_test.go b/internal/engine/webrtc_test.go new file mode 100644 index 0000000..efae41d --- /dev/null +++ b/internal/engine/webrtc_test.go @@ -0,0 +1,177 @@ +package engine + +import ( + "context" + "net/url" + "strings" + "testing" + + "github.com/pion/webrtc/v4" + "github.com/torrentclaw/unarr/internal/config" +) + +const validHash = "aaf2c71b0e0a03d3f9b2a3e1d5c6b7a8f0e1d2c3" + +// TestBuildMagnet_NoExtras verifies the legacy free-function path keeps +// emitting only the static defaultTrackers list. +func TestBuildMagnet_NoExtras(t *testing.T) { + got := buildMagnet(validHash) + if !strings.HasPrefix(got, "magnet:?xt=urn:btih:"+validHash) { + t.Fatalf("magnet missing xt: %s", got) + } + if !strings.Contains(got, url.QueryEscape("udp://tracker.opentrackr.org:1337/announce")) { + t.Fatal("expected default UDP tracker absent") + } + if strings.Contains(got, "wss%3A") { + t.Fatalf("unexpected WSS tracker leaked when none requested: %s", got) + } +} + +// TestBuildMagnet_WithExtraTrackers verifies extraTrackers (e.g. WebRTC +// WSS endpoints) are prepended before the defaults and properly URL-encoded. +func TestBuildMagnet_WithExtraTrackers(t *testing.T) { + got := buildMagnet(validHash, "wss://tracker.torrentclaw.com") + encWss := url.QueryEscape("wss://tracker.torrentclaw.com") + encUDP := url.QueryEscape("udp://tracker.opentrackr.org:1337/announce") + if !strings.Contains(got, "tr="+encWss) { + t.Fatalf("WSS tracker missing: %s", got) + } + wssIdx := strings.Index(got, encWss) + udpIdx := strings.Index(got, encUDP) + if wssIdx < 0 || udpIdx < 0 || wssIdx > udpIdx { + t.Fatalf("WSS tracker should appear BEFORE UDP defaults: wss=%d udp=%d", wssIdx, udpIdx) + } +} + +// TestBuildMagnet_TrimsAndSkipsEmpty makes sure callers passing config-derived +// slices with stray whitespace or empty strings don't get malformed magnets. +func TestBuildMagnet_TrimsAndSkipsEmpty(t *testing.T) { + got := buildMagnet(validHash, " wss://tracker.torrentclaw.com ", "", " ") + encWss := url.QueryEscape("wss://tracker.torrentclaw.com") + if !strings.Contains(got, "tr="+encWss) { + t.Fatalf("trimmed WSS tracker missing: %s", got) + } + if strings.Contains(got, "tr=&") || strings.HasSuffix(got, "tr=") { + t.Fatalf("empty tracker emitted: %s", got) + } +} + +// TestTorrentDownloader_buildMagnet_WebRTCDisabled confirms the downloader +// method does NOT inject WebRTCTrackers when WebRTCEnabled is false. +func TestTorrentDownloader_buildMagnet_WebRTCDisabled(t *testing.T) { + d := &TorrentDownloader{cfg: TorrentConfig{ + WebRTCEnabled: false, + WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, + }} + got := d.buildMagnet(validHash) + if strings.Contains(got, "wss%3A") { + t.Fatalf("WSS tracker leaked while WebRTCEnabled=false: %s", got) + } +} + +// TestTorrentDownloader_buildMagnet_WebRTCEnabled confirms the WSS trackers +// are present when WebRTCEnabled is true. +func TestTorrentDownloader_buildMagnet_WebRTCEnabled(t *testing.T) { + d := &TorrentDownloader{cfg: TorrentConfig{ + WebRTCEnabled: true, + WebRTCTrackers: []string{"wss://tracker.torrentclaw.com", "wss://tracker2.example.com"}, + }} + got := d.buildMagnet(validHash) + for _, want := range []string{ + "wss://tracker.torrentclaw.com", + "wss://tracker2.example.com", + } { + if !strings.Contains(got, url.QueryEscape(want)) { + t.Fatalf("expected tracker %q missing in magnet: %s", want, got) + } + } +} + +// TestBuildICEServers_DisabledReturnsNil ensures we don't leak STUN/TURN +// configuration into the torrent client when the user has WebRTC off. +func TestBuildICEServers_DisabledReturnsNil(t *testing.T) { + got := BuildICEServers(config.WebRTCConfig{ + Enabled: false, + STUNServers: []string{"stun:stun.l.google.com:19302"}, + }) + if got != nil { + t.Fatalf("expected nil ICE servers when disabled, got %+v", got) + } +} + +// TestBuildICEServers_STUNOnly converts STUN entries to bare ICEServer +// records with no credentials. +func TestBuildICEServers_STUNOnly(t *testing.T) { + got := BuildICEServers(config.WebRTCConfig{ + Enabled: true, + STUNServers: []string{"stun:stun.l.google.com:19302", "", "stun:stun1.l.google.com:19302"}, + }) + if len(got) != 2 { + t.Fatalf("expected 2 STUN servers (empty skipped), got %d (%+v)", len(got), got) + } + if got[0].URLs[0] != "stun:stun.l.google.com:19302" { + t.Fatalf("first server unexpected: %+v", got[0]) + } + if got[0].Username != "" || got[0].Credential != nil { + t.Fatalf("STUN entry should have no credentials, got %+v", got[0]) + } +} + +// TestNewTorrentDownloader_WebRTCEnabled creates a downloader with the +// WebRTC peer fully wired up and confirms the constructor doesn't error +// (anacrolix accepts the ICE server list, port binds, etc.). +func TestNewTorrentDownloader_WebRTCEnabled(t *testing.T) { + dir := t.TempDir() + dl, err := NewTorrentDownloader(TorrentConfig{ + DataDir: dir, + ListenPort: 0, // let the OS pick — avoid clashes in CI + WebRTCEnabled: true, + WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, + ICEServers: BuildICEServers(config.WebRTCConfig{ + Enabled: true, + STUNServers: []string{"stun:stun.l.google.com:19302"}, + }), + }) + if err != nil { + t.Fatalf("WebRTC-enabled downloader failed to start: %v", err) + } + defer func() { + if err := dl.Shutdown(context.Background()); err != nil { + t.Logf("shutdown: %v", err) + } + }() + + // Magnet for any task should now contain the WSS tracker. + got := dl.buildMagnet(validHash) + if !strings.Contains(got, "wss%3A%2F%2Ftracker.torrentclaw.com") { + t.Fatalf("WebRTC magnet missing WSS tracker: %s", got) + } +} + +// TestBuildICEServers_TURNWithCreds applies TURNUser/TURNPass to every TURN +// entry so the operator only specifies them once. +func TestBuildICEServers_TURNWithCreds(t *testing.T) { + got := BuildICEServers(config.WebRTCConfig{ + Enabled: true, + STUNServers: []string{"stun:stun.l.google.com:19302"}, + TURNServers: []string{"turn:turn.example.com:3478"}, + TURNUser: "alice", + TURNPass: "s3cr3t", + }) + if len(got) != 2 { + t.Fatalf("expected 1 STUN + 1 TURN, got %d", len(got)) + } + turn := got[1] + if turn.URLs[0] != "turn:turn.example.com:3478" { + t.Fatalf("TURN URL wrong: %+v", turn) + } + if turn.Username != "alice" { + t.Fatalf("TURN username wrong: %s", turn.Username) + } + if turn.Credential != "s3cr3t" { + t.Fatalf("TURN credential wrong: %v", turn.Credential) + } + if turn.CredentialType != webrtc.ICECredentialTypePassword { + t.Fatalf("TURN credential type wrong: %v", turn.CredentialType) + } +} From aa291320f5638ab411cc5580524caf5f8531cf14 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 6 May 2026 09:40:37 +0200 Subject: [PATCH 03/88] test(wstracker-probe): standalone Go binary to verify WSS tracker reachability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tiny `go run ./cmd/wstracker-probe` that spins up an anacrolix/torrent Client with WebRTC enabled, advertises a random info_hash to the given WSS tracker, and reports via Callbacks.StatusUpdated whether the announce round-trip succeeded. Used as the production smoke for unarr ↔ wss://tracker.torrentclaw.com: $ /tmp/wstracker-probe -tracker wss://tracker.torrentclaw.com -timeout 30s [probe] tracker=wss://tracker.torrentclaw.com info_hash=e978df8d... timeout=30s [probe] tracker connected: wss://tracker.torrentclaw.com [probe] tracker announce OK: wss://tracker.torrentclaw.com ih=e978df8d... [probe] OK — tracker announce succeeded Disables TCP/uTP/DHT/IPv6/UPnP — only the WS tracker path matters here. Exit codes: 0 success, 1 announce error, 2 timeout. --- cmd/wstracker-probe/main.go | 117 ++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 cmd/wstracker-probe/main.go diff --git a/cmd/wstracker-probe/main.go b/cmd/wstracker-probe/main.go new file mode 100644 index 0000000..660e297 --- /dev/null +++ b/cmd/wstracker-probe/main.go @@ -0,0 +1,117 @@ +// wstracker-probe — connects to a WebSocket BitTorrent tracker, advertises +// a fake info_hash, and reports whether the announce succeeds. +// +// Usage: +// +// go run ./cmd/wstracker-probe -tracker wss://tracker.torrentclaw.com +// +// Exit code 0 on TrackerAnnounceSuccessful, 1 on timeout/error. +package main + +import ( + "context" + "crypto/rand" + "flag" + "fmt" + "log" + "os" + "time" + + alog "github.com/anacrolix/log" + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/storage" + "github.com/pion/webrtc/v4" +) + +func main() { + tracker := flag.String("tracker", "wss://tracker.torrentclaw.com", "WSS tracker URL to probe") + timeout := flag.Duration("timeout", 30*time.Second, "max wait for successful announce") + flag.Parse() + + tmp, err := os.MkdirTemp("", "wstracker-probe-*") + if err != nil { + log.Fatalf("temp dir: %v", err) + } + defer os.RemoveAll(tmp) + + cfg := torrent.NewDefaultClientConfig() + cfg.DataDir = tmp + cfg.DefaultStorage = storage.NewMMap(tmp) + cfg.Seed = false + cfg.NoUpload = false + cfg.DisableTCP = true + cfg.DisableUTP = true + cfg.DisableIPv6 = true + cfg.NoDHT = true + cfg.NoDefaultPortForwarding = true + cfg.ListenPort = 0 + cfg.Logger = alog.Default.FilterLevel(alog.Critical) + cfg.DisableWebtorrent = false + cfg.ICEServerList = []webrtc.ICEServer{ + {URLs: []string{"stun:stun.l.google.com:19302"}}, + } + + annSuccess := make(chan struct{}, 1) + annError := make(chan error, 1) + cfg.Callbacks.StatusUpdated = append( + cfg.Callbacks.StatusUpdated, + func(e torrent.StatusUpdatedEvent) { + switch e.Event { //nolint:exhaustive // peer events are noise for tracker probe + case torrent.TrackerConnected: + if e.Error != nil { + fmt.Printf("[probe] tracker connect FAILED: %v\n", e.Error) + } else { + fmt.Printf("[probe] tracker connected: %s\n", e.Url) + } + case torrent.TrackerAnnounceSuccessful: + fmt.Printf("[probe] tracker announce OK: %s ih=%s\n", e.Url, e.InfoHash) + select { + case annSuccess <- struct{}{}: + default: + } + case torrent.TrackerAnnounceError: + fmt.Printf("[probe] tracker announce ERROR: %s ih=%s err=%v\n", e.Url, e.InfoHash, e.Error) + select { + case annError <- e.Error: + default: + } + case torrent.TrackerDisconnected: + fmt.Printf("[probe] tracker disconnected: %s err=%v\n", e.Url, e.Error) + } + }, + ) + + client, err := torrent.NewClient(cfg) + if err != nil { + log.Fatalf("create torrent client: %v", err) + } + defer client.Close() + + var ih [20]byte + if _, err := rand.Read(ih[:]); err != nil { + log.Fatalf("random info_hash: %v", err) + } + magnet := fmt.Sprintf("magnet:?xt=urn:btih:%x&tr=%s", ih, *tracker) + fmt.Printf("[probe] tracker=%s info_hash=%x timeout=%s\n", *tracker, ih, *timeout) + + t, err := client.AddMagnet(magnet) + if err != nil { + log.Fatalf("add magnet: %v", err) + } + defer t.Drop() + + ctx, cancel := context.WithTimeout(context.Background(), *timeout) + defer cancel() + + select { + case <-annSuccess: + fmt.Println("[probe] OK — tracker announce succeeded") + os.Exit(0) + case err := <-annError: + fmt.Printf("[probe] FAIL — tracker announce error: %v\n", err) + os.Exit(1) + case <-ctx.Done(): + fmt.Printf("[probe] FAIL — timeout after %s\n", *timeout) + os.Exit(2) + } +} From 727ab19468577624ba858b97bc295f89a3c7a791 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 6 May 2026 09:49:32 +0200 Subject: [PATCH 04/88] feat(mediainfo): ResolveFFmpeg + DownloadFFmpeg mirroring ffprobe pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the ffmpeg-binary half of the resolution stack so the upcoming WebRTC streaming transcoder (Fase 3.3) has a single point of entry. Search order matches ResolveFFprobe so operators don't need to learn a second mental model: 1. Explicit path (--ffmpeg flag / library.ffmpeg_path config) 2. FFMPEG_PATH env var 3. "ffmpeg" on PATH (system install) 4. Adjacent to the unarr executable (release tarball bundles it here — this is the preferred path; see Fase 3.2 goreleaser changes) 5. Cache dir (sibling of the cached ffprobe binary) 6. Auto-download from ffbinaries.com (~70MB) as last resort Includes: - internal/library/mediainfo/ffmpeg.go — ResolveFFmpeg + actionable Docker / non-Docker error messages - internal/library/mediainfo/ffmpeg_download.go — DownloadFFmpeg, reuses ffprobePlatformKey + ffprobeAPIClient + ffprobeDLClient + extractFromZip helpers; bumps maxZipSize to 200MB (ffmpeg static is ~70-100MB) - internal/config: LibraryConfig.FFmpegPath toml field for explicit paths - 4 unit tests: explicit OK, explicit missing, env var, sibling cache path Tarball bundling and the actual transcoding pipeline land in the next two commits. --- internal/config/config.go | 1 + internal/library/mediainfo/ffmpeg.go | 79 ++++++++++++ internal/library/mediainfo/ffmpeg_download.go | 116 ++++++++++++++++++ internal/library/mediainfo/ffmpeg_test.go | 78 ++++++++++++ 4 files changed, 274 insertions(+) create mode 100644 internal/library/mediainfo/ffmpeg.go create mode 100644 internal/library/mediainfo/ffmpeg_download.go create mode 100644 internal/library/mediainfo/ffmpeg_test.go diff --git a/internal/config/config.go b/internal/config/config.go index cb53280..bb7498c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -84,6 +84,7 @@ type LibraryConfig struct { ScanPath string `toml:"scan_path"` // remembered from last scan Workers int `toml:"workers"` // concurrent ffprobe (default 8) FFprobePath string `toml:"ffprobe_path"` // optional explicit path + FFmpegPath string `toml:"ffmpeg_path"` // optional explicit path (used by WebRTC streaming transcoder) BackupDir string `toml:"backup_dir"` // for replaced files AutoScan bool `toml:"auto_scan"` // enable daily auto-scan in daemon (default true) ScanInterval string `toml:"scan_interval"` // e.g. "24h", "12h", "6h" (default "24h") diff --git a/internal/library/mediainfo/ffmpeg.go b/internal/library/mediainfo/ffmpeg.go new file mode 100644 index 0000000..113e7c7 --- /dev/null +++ b/internal/library/mediainfo/ffmpeg.go @@ -0,0 +1,79 @@ +package mediainfo + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime" +) + +// ResolveFFmpeg finds the ffmpeg binary. Search order mirrors ResolveFFprobe +// so the same operator setup works for both: +// 1. Explicit path (--ffmpeg flag / library.ffmpeg_path config) +// 2. FFMPEG_PATH env var +// 3. "ffmpeg" on PATH +// 4. Adjacent to the current executable (release tarball bundles ffmpeg +// next to the unarr binary — this is the preferred install path) +// 5. Previously downloaded in the unarr cache dir +// 6. Auto-download static binary as last resort (~50MB, slow start) +// +// ffmpeg is required for the WebRTC streaming pipeline; ffprobe alone can't +// transcode HEVC/MKV to browser-friendly H.264/MP4 fragments. +func ResolveFFmpeg(explicit string) (string, error) { + if explicit != "" { + if _, err := os.Stat(explicit); err == nil { + return explicit, nil + } + return "", fmt.Errorf("ffmpeg not found at explicit path: %s", explicit) + } + + if envPath := os.Getenv("FFMPEG_PATH"); envPath != "" { + if _, err := os.Stat(envPath); err == nil { + return envPath, nil + } + } + + if p, err := exec.LookPath("ffmpeg"); err == nil { + return p, nil + } + + if exePath, err := os.Executable(); err == nil { + name := "ffmpeg" + if runtime.GOOS == "windows" { + name = "ffmpeg.exe" + } + adjacent := filepath.Join(filepath.Dir(exePath), name) + if _, err := os.Stat(adjacent); err == nil { + return adjacent, nil + } + } + + if cached, err := FFmpegCachePath(); err == nil { + if _, err := os.Stat(cached); err == nil { + return cached, nil + } + } + + if p, err := DownloadFFmpeg(); err == nil { + return p, nil + } + + if isDocker() { + return "", fmt.Errorf( + "ffmpeg not found and auto-download failed (read-only filesystem?).\n" + + "Options:\n" + + " • Use the official image: torrentclaw/unarr (includes ffmpeg)\n" + + " • Set FFMPEG_PATH env var to point to a pre-installed ffmpeg binary\n" + + " • Add to config.toml: [library]\\nffmpeg_path = \"/path/to/ffmpeg\"", + ) + } + return "", fmt.Errorf( + "ffmpeg not found and auto-download failed.\n" + + "Options:\n" + + " • Install ffmpeg: sudo apt install ffmpeg (or brew install ffmpeg)\n" + + " • Use the unarr release tarball — ffmpeg is bundled next to the binary\n" + + " • Set FFMPEG_PATH env var to point to the ffmpeg binary\n" + + " • Add to config.toml: [library]\\nffmpeg_path = \"/path/to/ffmpeg\"", + ) +} diff --git a/internal/library/mediainfo/ffmpeg_download.go b/internal/library/mediainfo/ffmpeg_download.go new file mode 100644 index 0000000..6d4f81c --- /dev/null +++ b/internal/library/mediainfo/ffmpeg_download.go @@ -0,0 +1,116 @@ +package mediainfo + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "runtime" +) + +const maxFFmpegZipSize = 200 * 1024 * 1024 // 200MB — ffmpeg static is ~70-100MB compressed + +// FFmpegCachePath returns the full path to the cached ffmpeg binary +// (sibling of the cached ffprobe binary). +func FFmpegCachePath() (string, error) { + dir, err := FFprobeCacheDir() + if err != nil { + return "", err + } + name := "ffmpeg" + if runtime.GOOS == "windows" { + name = "ffmpeg.exe" + } + return filepath.Join(dir, name), nil +} + +// DownloadFFmpeg downloads a static ffmpeg binary for the current platform +// and caches it locally. Returns the path to the binary. Reuses +// resolveFFprobeURL's ffbinaries.com discovery endpoint — that index ships +// both ffprobe and ffmpeg per platform. +func DownloadFFmpeg() (string, error) { + dest, err := FFmpegCachePath() + if err != nil { + return "", fmt.Errorf("cannot determine cache path: %w", err) + } + + if _, err := os.Stat(dest); err == nil { + return dest, nil + } + + platform, err := ffprobePlatformKey() + if err != nil { + return "", err + } + + url, err := resolveFFmpegURL(platform) + if err != nil { + return "", err + } + + fmt.Fprintf(os.Stderr, "ffmpeg not found — downloading for %s (~70MB)...\n", platform) + + resp, err := ffprobeDLClient.Get(url) + if err != nil { + return "", fmt.Errorf("download failed: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("download failed: HTTP %d", resp.StatusCode) + } + + zipData, err := io.ReadAll(io.LimitReader(resp.Body, maxFFmpegZipSize)) + if err != nil { + return "", fmt.Errorf("download read failed: %w", err) + } + + name := "ffmpeg" + if runtime.GOOS == "windows" { + name = "ffmpeg.exe" + } + + binary, err := extractFromZip(zipData, name) + if err != nil { + return "", err + } + + if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil { + return "", fmt.Errorf("cannot create cache directory: %w", err) + } + + if err := os.WriteFile(dest, binary, 0o755); err != nil { + return "", fmt.Errorf("cannot write ffmpeg binary: %w", err) + } + + fmt.Fprintf(os.Stderr, "ffmpeg installed to %s\n", dest) + return dest, nil +} + +// resolveFFmpegURL fetches the ffbinaries index and returns the ffmpeg +// download URL for the requested platform key (e.g. "linux-64"). +func resolveFFmpegURL(platform string) (string, error) { + resp, err := ffprobeAPIClient.Get(ffbinariesAPI) + if err != nil { + return "", fmt.Errorf("cannot reach ffbinaries.com: %w", err) + } + defer resp.Body.Close() + + var data ffbinariesResponse + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return "", fmt.Errorf("cannot parse ffbinaries response: %w", err) + } + + bins, ok := data.Bin[platform] + if !ok { + return "", fmt.Errorf("no ffmpeg binary available for platform %q", platform) + } + + url, ok := bins["ffmpeg"] + if !ok { + return "", fmt.Errorf("no ffmpeg download URL for platform %q", platform) + } + + return url, nil +} diff --git a/internal/library/mediainfo/ffmpeg_test.go b/internal/library/mediainfo/ffmpeg_test.go new file mode 100644 index 0000000..f2dd9af --- /dev/null +++ b/internal/library/mediainfo/ffmpeg_test.go @@ -0,0 +1,78 @@ +package mediainfo + +import ( + "os" + "path/filepath" + "runtime" + "testing" +) + +// TestResolveFFmpeg_ExplicitOK verifies the explicit-path branch returns +// the requested binary if it exists on disk. +func TestResolveFFmpeg_ExplicitOK(t *testing.T) { + dir := t.TempDir() + fake := filepath.Join(dir, "ffmpeg") + if err := os.WriteFile(fake, []byte("#!/bin/sh\n"), 0o755); err != nil { + t.Fatalf("write fake: %v", err) + } + + got, err := ResolveFFmpeg(fake) + if err != nil { + t.Fatalf("ResolveFFmpeg(explicit): %v", err) + } + if got != fake { + t.Fatalf("got %q want %q", got, fake) + } +} + +// TestResolveFFmpeg_ExplicitMissing returns a clear error when the path +// the operator supplied doesn't exist — we do NOT silently fall back. +func TestResolveFFmpeg_ExplicitMissing(t *testing.T) { + _, err := ResolveFFmpeg("/nonexistent/path/ffmpeg-XXXXXX") + if err == nil { + t.Fatal("expected error for missing explicit path") + } +} + +// TestResolveFFmpeg_EnvVar honours FFMPEG_PATH when no explicit path is given. +func TestResolveFFmpeg_EnvVar(t *testing.T) { + dir := t.TempDir() + fake := filepath.Join(dir, "ffmpeg") + if err := os.WriteFile(fake, []byte("#!/bin/sh\n"), 0o755); err != nil { + t.Fatalf("write fake: %v", err) + } + t.Setenv("FFMPEG_PATH", fake) + // Hide the real ffmpeg from PATH so the env var is the next branch hit. + t.Setenv("PATH", "/nonexistent") + + got, err := ResolveFFmpeg("") + if err != nil { + t.Fatalf("ResolveFFmpeg(env): %v", err) + } + if got != fake { + t.Fatalf("got %q want %q (env-var branch)", got, fake) + } +} + +// TestFFmpegCachePath returns a sibling path to the ffprobe cache, +// consistent with the install layout the tarball produces. +func TestFFmpegCachePath(t *testing.T) { + got, err := FFmpegCachePath() + if err != nil { + t.Fatalf("FFmpegCachePath: %v", err) + } + want := "ffmpeg" + if runtime.GOOS == "windows" { + want = "ffmpeg.exe" + } + if filepath.Base(got) != want { + t.Fatalf("cache path basename = %q want %q", filepath.Base(got), want) + } + probeCache, err := FFprobeCachePath() + if err != nil { + t.Fatalf("FFprobeCachePath: %v", err) + } + if filepath.Dir(got) != filepath.Dir(probeCache) { + t.Fatalf("ffmpeg cache (%s) and ffprobe cache (%s) should share a directory", got, probeCache) + } +} From e68b127acc4a4b7bf8328e6beb7406f62b509faa Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 6 May 2026 11:26:01 +0200 Subject: [PATCH 05/88] feat(release): bundle ffmpeg + ffprobe in tarballs and Docker image MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Operators no longer have to install ffmpeg manually. Both the release tarballs (5 platforms × 2 binaries) and the Docker image now ship a working ffmpeg + ffprobe pair adjacent to the unarr binary; ResolveFFmpeg / ResolveFFprobe pick them up via the "adjacent to executable" branch with zero configuration. Tarball bundle (scripts/download-ffmpeg-static.sh + .goreleaser.yml): - ffbinaries.com (johnvansickle / Zeranoe-style static GPL builds) for linux-amd64, linux-arm64, darwin-amd64, windows-amd64 - evermeet.cx universal Mach-O for darwin-arm64 (ffbinaries lacks it) - BtbN/FFmpeg-Builds for windows-arm64 (ffbinaries lacks it) - Idempotent fetch with curl --retry 5 so transient github.com SSL errors don't fail the goreleaser before-hook - New `before.hooks` runs the script automatically per release; archive files glob `dist-ffbinaries/{{ .Os }}-{{ .Arch }}/*` + strip_parent - Migrated to non-deprecated `formats: [tar.gz]` / `formats: [zip]` - Verified via `goreleaser release --snapshot --clean --skip=publish` — 6 archives all carry ffmpeg + ffprobe (~60-130MB each) Docker image (Dockerfile): - Replaced the failing BtbN static glibc binaries with Alpine's native musl `apk add ffmpeg`. The static GPL builds need glibc + libmvec / libgcc_s; gcompat alone is not enough (vector-math symbols unresolved). Alpine ships ffmpeg 6.1.2 which is fine for the WebRTC transcoder. - Image size 174MB, built + ffmpeg/ffprobe/unarr smoke OK. Targets the v0.8 unarr release (per user direction — new feature, not a patch). dist-ffbinaries/ added to .gitignore. --- .gitignore | 1 + .goreleaser.yml | 22 +++++- Dockerfile | 30 ++------ scripts/download-ffmpeg-static.sh | 117 ++++++++++++++++++++++++++++++ 4 files changed, 144 insertions(+), 26 deletions(-) create mode 100755 scripts/download-ffmpeg-static.sh diff --git a/.gitignore b/.gitignore index 0de3731..a6d17b3 100644 --- a/.gitignore +++ b/.gitignore @@ -36,6 +36,7 @@ Thumbs.db # GoReleaser dist/ +dist-ffbinaries/ # Docker tmp/ diff --git a/.goreleaser.yml b/.goreleaser.yml index 44656cd..0a5c821 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -2,6 +2,14 @@ version: 2 project_name: unarr +# Pre-build hook: fetch static ffmpeg + ffprobe per platform so each +# release tarball ships them adjacent to the unarr binary. ResolveFFmpeg / +# ResolveFFprobe pick them up via the "adjacent to executable" branch — no +# system install or runtime download needed. +before: + hooks: + - bash scripts/download-ffmpeg-static.sh + builds: - main: ./cmd/unarr/ binary: unarr @@ -20,11 +28,21 @@ builds: - -X github.com/torrentclaw/unarr/internal/sentry.dsn={{ .Env.SENTRY_DSN }} archives: - - format: tar.gz + - formats: [tar.gz] name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}" format_overrides: - goos: windows - format: zip + formats: [zip] + files: + - LICENSE* + - README* + # Bundle the matching ffmpeg + ffprobe (filename includes .exe on Windows + # because download-ffmpeg-static.sh writes ffmpeg.exe / ffprobe.exe there). + - src: "dist-ffbinaries/{{ .Os }}-{{ .Arch }}/*" + dst: . + strip_parent: true + info: + mode: 0o755 checksum: name_template: "checksums.txt" diff --git a/Dockerfile b/Dockerfile index f0e816f..1773622 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,25 +1,3 @@ -# ---- ffprobe static binary stage ---- -# Download a static ffprobe build from BtbN/FFmpeg-Builds (GitHub CDN, reliable). -FROM alpine:3.22 AS ffprobe-dl - -RUN apk add --no-cache curl xz - -RUN ARCH=$(uname -m) && \ - case "$ARCH" in \ - x86_64) SLUG="linux64" ;; \ - aarch64) SLUG="linuxarm64" ;; \ - *) echo "Unsupported arch: $ARCH" && exit 1 ;; \ - esac && \ - curl -fsSL --retry 3 --retry-delay 5 \ - "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-${SLUG}-gpl.tar.xz" \ - -o /tmp/ff.tar.xz && \ - mkdir /tmp/ffbuild && \ - tar xJ -f /tmp/ff.tar.xz --strip-components=1 -C /tmp/ffbuild/ && \ - mv /tmp/ffbuild/bin/ffprobe /usr/local/bin/ffprobe && \ - chmod +x /usr/local/bin/ffprobe && \ - rm -rf /tmp/ff.tar.xz /tmp/ffbuild && \ - ffprobe -version | head -1 - # ---- Build stage ---- FROM golang:1.25-alpine AS builder @@ -40,8 +18,13 @@ RUN CGO_ENABLED=0 go build -ldflags="-s -w -X github.com/torrentclaw/unarr/inter # ---- Runtime stage ---- FROM alpine:3.22 +# Use Alpine's native musl ffmpeg + ffprobe instead of the johnvansickle / +# BtbN static glibc builds — those need a glibc shim on Alpine and the +# vector-math symbols the GPL builds reference are not satisfiable by +# gcompat. Alpine ships ffmpeg ~7.x which is fine for the WebRTC +# transcoding pipeline (libx264 + libfdk-aac alternatives included). RUN apk upgrade --no-cache && \ - apk add --no-cache ca-certificates tzdata + apk add --no-cache ca-certificates tzdata ffmpeg # Non-root user (UID 1000 matches typical host user for volume permissions) RUN addgroup -g 1000 unarr && adduser -u 1000 -G unarr -D -h /home/unarr unarr @@ -53,7 +36,6 @@ RUN mkdir -p /config /downloads /data && \ USER unarr COPY --from=builder /unarr /usr/local/bin/unarr -COPY --from=ffprobe-dl /usr/local/bin/ffprobe /usr/local/bin/ffprobe # Environment: point config/data to container paths ENV UNARR_CONFIG_DIR=/config diff --git a/scripts/download-ffmpeg-static.sh b/scripts/download-ffmpeg-static.sh new file mode 100755 index 0000000..719fcde --- /dev/null +++ b/scripts/download-ffmpeg-static.sh @@ -0,0 +1,117 @@ +#!/usr/bin/env bash +# scripts/download-ffmpeg-static.sh — fetch static ffmpeg + ffprobe binaries +# for every platform we ship. Run by goreleaser's `before.hooks` so each +# tarball can bundle the binaries adjacent to `unarr`. +# +# Source: https://ffbinaries.com (same index the runtime fallback uses). +# Output: +# dist-ffbinaries/-/{ffmpeg, ffprobe}[.exe] +# Idempotent: skips downloads when the target file already exists. + +set -euo pipefail + +# Map ffbinaries platform key → goreleaser {Os}-{Arch}. ffbinaries.com only +# ships an x86_64 macOS build; for darwin-arm64 we fall back to evermeet.cx +# universal binaries (handled separately below). +PLATFORMS=( + "linux-64:linux-amd64" + "linux-arm64:linux-arm64" + "osx-64:darwin-amd64" + "windows-64:windows-amd64" +) +DEST_ROOT="${FFBINARIES_DEST:-dist-ffbinaries}" +INDEX_URL="https://ffbinaries.com/api/v1/version/latest" + +for cmd in curl jq unzip; do + command -v "$cmd" >/dev/null 2>&1 || { + echo "[ffbin] missing required tool: $cmd" >&2 + exit 2 + } +done + +mkdir -p "$DEST_ROOT" + +echo "[ffbin] fetching index from $INDEX_URL" +INDEX_JSON="$(curl -fsSL "$INDEX_URL")" +VERSION="$(echo "$INDEX_JSON" | jq -r .version)" +echo "[ffbin] ffbinaries version: $VERSION" + +for entry in "${PLATFORMS[@]}"; do + ffbkey="${entry%%:*}" + goplat="${entry##*:}" + outdir="$DEST_ROOT/$goplat" + mkdir -p "$outdir" + + for tool in ffmpeg ffprobe; do + binname="$tool" + [[ "$goplat" == windows-* ]] && binname="${tool}.exe" + + if [ -f "$outdir/$binname" ]; then + echo "[ffbin] skip $goplat/$binname (already present)" + continue + fi + + url="$(echo "$INDEX_JSON" | jq -r ".bin[\"$ffbkey\"][\"$tool\"] // empty")" + if [ -z "$url" ]; then + echo "[ffbin] WARN $goplat/$tool: no download URL in index" >&2 + continue + fi + + tmpzip="$(mktemp --suffix=.zip)" + echo "[ffbin] fetch $goplat/$tool from $url" + curl -fsSL --retry 5 --retry-delay 3 --retry-all-errors "$url" -o "$tmpzip" + unzip -p "$tmpzip" "$binname" > "$outdir/$binname" + chmod +x "$outdir/$binname" + rm -f "$tmpzip" + done +done + +# --- darwin-arm64 via evermeet.cx (universal binary; ffbinaries lacks it) --- +darwin_arm_dir="$DEST_ROOT/darwin-arm64" +mkdir -p "$darwin_arm_dir" +for tool in ffmpeg ffprobe; do + out="$darwin_arm_dir/$tool" + if [ -f "$out" ]; then + echo "[ffbin] skip darwin-arm64/$tool (already present)" + continue + fi + url="https://evermeet.cx/ffmpeg/getrelease/$tool/zip" + tmpzip="$(mktemp --suffix=.zip)" + echo "[ffbin] fetch darwin-arm64/$tool from $url" + curl -fsSL --retry 5 --retry-delay 3 --retry-all-errors "$url" -o "$tmpzip" + unzip -p "$tmpzip" "$tool" > "$out" + chmod +x "$out" + rm -f "$tmpzip" +done + +# --- windows-arm64 via BtbN/FFmpeg-Builds (ffbinaries lacks it) --- +# BtbN ships a single zip per platform with ffmpeg.exe + ffprobe.exe under +# ffmpeg-master-latest-winarm64-gpl/bin/. Extract both in one fetch. +win_arm_dir="$DEST_ROOT/windows-arm64" +mkdir -p "$win_arm_dir" +needs_win_arm=0 +for tool in ffmpeg.exe ffprobe.exe; do + [ -f "$win_arm_dir/$tool" ] || needs_win_arm=1 +done +if [ "$needs_win_arm" = "1" ]; then + url="https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-winarm64-gpl.zip" + tmpzip="$(mktemp --suffix=.zip)" + echo "[ffbin] fetch windows-arm64/{ffmpeg,ffprobe}.exe from $url" + curl -fsSL --retry 5 --retry-delay 3 --retry-all-errors "$url" -o "$tmpzip" + for tool in ffmpeg.exe ffprobe.exe; do + out="$win_arm_dir/$tool" + member="$(unzip -Z1 "$tmpzip" "*/bin/$tool" 2>/dev/null | head -1)" + if [ -z "$member" ]; then + echo "[ffbin] WARN windows-arm64/$tool: not found in BtbN zip" >&2 + continue + fi + unzip -p "$tmpzip" "$member" > "$out" + chmod +x "$out" + done + rm -f "$tmpzip" +else + echo "[ffbin] skip windows-arm64 (already present)" +fi + +echo "[ffbin] done. layout:" +find "$DEST_ROOT" -type f -printf " %p (%s bytes)\n" From 75dcc0f1cb091e121db693d75ff0034be4f9d2b0 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 6 May 2026 11:34:57 +0200 Subject: [PATCH 06/88] feat(streaming): ffmpeg transcoding pipeline (direct play / fMP4 / HW accel) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The browser-side WebRTC reproductor needs MP4 / H.264 / AAC / yuv420p to keep MSE happy. This package decides per request whether to: • direct-play — input already MSE-compatible, just remux to fMP4 • transcode — re-encode video (libx264 / NVENC / QSV / VAAPI / VideoToolbox) + audio (AAC), fragment to fMP4 Pieces: - internal/streaming/transcoder.go — AnalyzeCompatibility decides the recipe from a parsed mediainfo. CompatibilityReport carries the reasons so the player UI can show "transcoding video: HEVC → H.264". - internal/streaming/ffmpeg_args.go — BuildFFmpegArgs assembles the argv for ffmpeg. Direct play uses `-c copy`; transcode uses libx264 or the selected HW encoder. Output is always fragmented MP4 piped to stdout (-movflags frag_keyframe+empty_moov+default_base_moof) so the HTTP handler can stream straight to the browser without disk I/O. Quality ladder: 480p (1.5Mb), 720p (3.5Mb), 1080p (6Mb), 2160p (25Mb). Default 1080p when unset / unknown. -ss seek for resume / scrubbing. - internal/streaming/hwaccel.go — DetectHWAccel runs `ffmpeg -encoders` once per process and caches the best available. Order: NVENC → QSV → VAAPI → VideoToolbox → libx264. VAAPI is the only family that wires up HW decode too (`-hwaccel vaapi`); the others software-decode and HW- encode (works fine and avoids /dev/dri permission rabbit holes). - internal/streaming/stream.go — Transcoder facade wires Analyze + Stream together for the API handler in Fase 4. Captures the last 8 KiB of ffmpeg stderr for diagnosable errors without unbounded memory. Tests (20 unit, all green): - AnalyzeCompatibility: h264+aac direct, video-only direct, HEVC → transcode, 10-bit HDR → transcode, EAC3 audio → transcode, nil guards - ResolveQuality: empty + unknown fallback to 1080p, 4-step ladder - BuildFFmpegArgs: direct play -c copy, transcode libx264 + bitrate + scale, NVENC swaps encoder & drops preset, VAAPI injects -hwaccel + scale_vaapi, -ss timestamp formatting - HWAccel: encoder-name table, VAAPI is the only one with HW decode - formatDuration: zero, sub-second, HH:MM:SS, negative-clamped - cappedBuffer: tail retention through multi-write and large-write paths - NewTranscoder: rejects empty paths --- internal/streaming/ffmpeg_args.go | 173 +++++++++++++++++ internal/streaming/hwaccel.go | 144 ++++++++++++++ internal/streaming/stream.go | 131 +++++++++++++ internal/streaming/transcoder.go | 135 +++++++++++++ internal/streaming/transcoder_test.go | 267 ++++++++++++++++++++++++++ 5 files changed, 850 insertions(+) create mode 100644 internal/streaming/ffmpeg_args.go create mode 100644 internal/streaming/hwaccel.go create mode 100644 internal/streaming/stream.go create mode 100644 internal/streaming/transcoder.go create mode 100644 internal/streaming/transcoder_test.go diff --git a/internal/streaming/ffmpeg_args.go b/internal/streaming/ffmpeg_args.go new file mode 100644 index 0000000..1869864 --- /dev/null +++ b/internal/streaming/ffmpeg_args.go @@ -0,0 +1,173 @@ +package streaming + +import ( + "fmt" + "strconv" + "time" +) + +// StreamOptions controls a single transcode/remux invocation. +type StreamOptions struct { + // Quality caps the output resolution and bitrate when transcoding. + // Direct play ignores it (the source bitrate wins). One of: + // "2160p", "1080p", "720p", "480p", "" (= "1080p"). + Quality string + + // StartOffset seeks the input N seconds in before transcoding. Useful + // for resume / scrubbing. Zero means start from the beginning. + StartOffset time.Duration + + // HW selects the hardware encoder. "" (or "none") means software libx264. + HW HWAccel + + // AudioTrackIndex selects which audio track to keep (0-based, before + // the video stream is excluded). Zero is the default track. + AudioTrackIndex int +} + +// QualityProfile maps a Quality label to encoder constraints. +type QualityProfile struct { + Label string // "1080p" + MaxHeight int // 1080 + VideoBitrate int // bits/s for libx264 -b:v + AudioBitrate int // bits/s for AAC +} + +// qualityProfiles is the full ladder. We default to 1080p when unset. +var qualityProfiles = map[string]QualityProfile{ + "2160p": {Label: "2160p", MaxHeight: 2160, VideoBitrate: 25_000_000, AudioBitrate: 192_000}, + "1080p": {Label: "1080p", MaxHeight: 1080, VideoBitrate: 6_000_000, AudioBitrate: 160_000}, + "720p": {Label: "720p", MaxHeight: 720, VideoBitrate: 3_500_000, AudioBitrate: 128_000}, + "480p": {Label: "480p", MaxHeight: 480, VideoBitrate: 1_500_000, AudioBitrate: 96_000}, +} + +// ResolveQuality returns the QualityProfile for a label, falling back to +// 1080p when the label is empty / unknown. +func ResolveQuality(label string) QualityProfile { + if p, ok := qualityProfiles[label]; ok { + return p + } + return qualityProfiles["1080p"] +} + +// fragmentedMP4Movflags are the magic flags MSE needs to consume an +// ffmpeg pipe as it's produced — avoids the moov atom being written at the +// end of the file (which would force buffering the whole stream). +const fragmentedMP4Movflags = "frag_keyframe+empty_moov+default_base_moof" + +// BuildFFmpegArgs returns the argv (without the binary itself) for +// ffmpeg given the input file, stream options, and a compatibility report. +// +// Two recipes: +// +// - Direct play: -c copy on every selected stream + remux to fMP4. +// - Transcode: re-encode video (libx264 / hwaccel) + audio (aac). +// +// The result writes fMP4 fragments to stdout (`pipe:1`) so the HTTP +// handler can stream them directly to the browser without touching disk. +func BuildFFmpegArgs(inputPath string, report CompatibilityReport, opts StreamOptions) []string { + args := []string{ + "-hide_banner", + "-loglevel", "warning", + "-nostdin", + } + + if opts.HW.HasDecoder() { + args = append(args, opts.HW.DecoderArgs()...) + } + + if opts.StartOffset > 0 { + args = append(args, "-ss", formatDuration(opts.StartOffset)) + } + + args = append(args, "-i", inputPath) + + // Map first video + selected audio. Drop subtitles (browser handles + // them out-of-band; baking them in is a Phase 4.x decision). + args = append(args, + "-map", "0:v:0", + "-map", fmt.Sprintf("0:a:%d?", opts.AudioTrackIndex), + ) + + if report.DirectPlay { + // Cheap path: copy streams, just remux container. + args = append(args, "-c", "copy") + } else { + // Transcode path: pick encoder per HW. + profile := ResolveQuality(opts.Quality) + args = append(args, transcodeArgs(profile, opts.HW)...) + } + + args = append(args, + "-movflags", fragmentedMP4Movflags, + "-f", "mp4", + "pipe:1", + ) + return args +} + +// transcodeArgs returns the encoder + bitrate flags. Keeps the function +// flat so the BuildFFmpegArgs reader can scan the recipe top to bottom. +func transcodeArgs(profile QualityProfile, hw HWAccel) []string { + args := []string{} + + // Video encoder. + args = append(args, "-c:v", hw.VideoEncoder()) + + // Scale filter caps the long edge to MaxHeight, preserving aspect. + // `force_original_aspect_ratio=decrease` keeps it ≤ MaxHeight when + // the source is taller and leaves smaller sources untouched. The + // `force_divisible_by=2` keeps libx264 happy. + scale := fmt.Sprintf( + "scale=-2:%d:force_original_aspect_ratio=decrease:force_divisible_by=2", + profile.MaxHeight, + ) + if hw == HWAccelVAAPI { + // VAAPI needs frames in the GPU surface, scaling is done with + // scale_vaapi. We still upload via format=nv12. + scale = fmt.Sprintf("format=nv12,hwupload,scale_vaapi=-2:%d", profile.MaxHeight) + } + args = append(args, "-vf", scale) + + // Bitrate ceiling (variable bitrate with 2× burst). + args = append(args, + "-b:v", strconv.Itoa(profile.VideoBitrate), + "-maxrate", strconv.Itoa(profile.VideoBitrate*2), + "-bufsize", strconv.Itoa(profile.VideoBitrate*4), + ) + + // SW-only: tune for low latency + don't waste cycles on the deepest + // preset when we're feeding live playback. + if hw == HWAccelNone || hw == HWAccelUnset { + args = append(args, + "-preset", "veryfast", + "-tune", "zerolatency", + ) + } + + // Force yuv420p so MSE reliably plays the result (some libx264 + // configurations otherwise emit yuv422p for SD content). + args = append(args, "-pix_fmt", "yuv420p") + + // Audio: re-encode to AAC stereo. Mono / 5.1 sources are downmixed. + args = append(args, + "-c:a", "aac", + "-b:a", strconv.Itoa(profile.AudioBitrate), + "-ac", "2", + ) + + return args +} + +// formatDuration prints a Go Duration as ffmpeg's `-ss HH:MM:SS.mmm`. +func formatDuration(d time.Duration) string { + if d < 0 { + d = 0 + } + h := int(d / time.Hour) + d -= time.Duration(h) * time.Hour + m := int(d / time.Minute) + d -= time.Duration(m) * time.Minute + s := float64(d) / float64(time.Second) + return fmt.Sprintf("%02d:%02d:%06.3f", h, m, s) +} diff --git a/internal/streaming/hwaccel.go b/internal/streaming/hwaccel.go new file mode 100644 index 0000000..1c8dff6 --- /dev/null +++ b/internal/streaming/hwaccel.go @@ -0,0 +1,144 @@ +package streaming + +import ( + "context" + "os/exec" + "runtime" + "strings" + "sync" + "time" +) + +// HWAccel identifies which hardware encoder family the host can use. +type HWAccel string + +const ( + HWAccelUnset HWAccel = "" + HWAccelNone HWAccel = "none" // explicit software libx264 + HWAccelNVENC HWAccel = "nvenc" // NVIDIA GPUs + HWAccelQSV HWAccel = "qsv" // Intel Quick Sync (Linux/Win) + HWAccelVAAPI HWAccel = "vaapi" // Intel/AMD GPUs on Linux + HWAccelVideoToolbox HWAccel = "videotoolbox" // macOS native +) + +// VideoEncoder returns the ffmpeg `-c:v` argument for this accelerator. +func (h HWAccel) VideoEncoder() string { + switch h { + case HWAccelNVENC: + return "h264_nvenc" + case HWAccelQSV: + return "h264_qsv" + case HWAccelVAAPI: + return "h264_vaapi" + case HWAccelVideoToolbox: + return "h264_videotoolbox" + default: + return "libx264" + } +} + +// HasDecoder reports whether the accelerator also supports HW decode. +// We always feed encoders software-decoded frames except for VAAPI where +// the GPU pipeline expects HW-decoded surfaces end-to-end. +func (h HWAccel) HasDecoder() bool { + return h == HWAccelVAAPI +} + +// DecoderArgs returns the ffmpeg flags that enable HW decode for this +// accelerator. Only meaningful when HasDecoder() == true. +func (h HWAccel) DecoderArgs() []string { + if h == HWAccelVAAPI { + return []string{ + "-hwaccel", "vaapi", + "-hwaccel_device", "/dev/dri/renderD128", + "-hwaccel_output_format", "vaapi", + } + } + return nil +} + +// detectedHWAccel caches the result of DetectHWAccel so we don't fork +// ffmpeg on every transcode request. +var ( + detectedHWAccelOnce sync.Once + detectedHWAccel HWAccel +) + +// DetectHWAccel asks ffmpeg what encoders it supports and returns the +// best available. Result is cached for the process lifetime — callers +// should construct the Transcoder once and reuse it. +// +// Detection order (best perf → fallback): +// 1. NVENC (NVIDIA GPU + CUDA driver) +// 2. QSV (Intel iGPU/dGPU + libmfx/intel-media-driver) +// 3. VAAPI (Linux Intel/AMD via /dev/dri) +// 4. VideoToolbox (macOS only) +// 5. None (fallback to libx264 software) +func DetectHWAccel(ctx context.Context, ffmpegPath string) HWAccel { + detectedHWAccelOnce.Do(func() { + detectedHWAccel = doDetectHWAccel(ctx, ffmpegPath) + }) + return detectedHWAccel +} + +// ResetHWAccelCache forces the next DetectHWAccel call to re-probe. +// Intended for tests. +func ResetHWAccelCache() { + detectedHWAccelOnce = sync.Once{} + detectedHWAccel = HWAccelUnset +} + +func doDetectHWAccel(ctx context.Context, ffmpegPath string) HWAccel { + if ctx == nil { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + } + + // macOS videotoolbox is reliable enough that we don't bother probing + // — every Apple Silicon Mac has it; Intel Macs since 10.13 do too. + if runtime.GOOS == "darwin" { + if encoderAvailable(ctx, ffmpegPath, "h264_videotoolbox") { + return HWAccelVideoToolbox + } + } + + for _, candidate := range []struct { + Name HWAccel + Encoder string + }{ + {HWAccelNVENC, "h264_nvenc"}, + {HWAccelQSV, "h264_qsv"}, + {HWAccelVAAPI, "h264_vaapi"}, + } { + if encoderAvailable(ctx, ffmpegPath, candidate.Encoder) { + return candidate.Name + } + } + + return HWAccelNone +} + +// encoderAvailable returns true when `ffmpeg -hide_banner -encoders` +// lists the named encoder. +// +// Note: this only verifies ffmpeg was COMPILED with the encoder. It does +// NOT guarantee the host hardware works at runtime — some users will see +// libx264 fall back at the first failed encode. That's OK; the worst +// case is a one-time slow request. +func encoderAvailable(ctx context.Context, ffmpegPath, encoder string) bool { + cmd := exec.CommandContext(ctx, ffmpegPath, "-hide_banner", "-encoders") + out, err := cmd.Output() + if err != nil { + return false + } + for _, line := range strings.Split(string(out), "\n") { + // `-encoders` output looks like: + // V..... libx264 libx264 H.264 / AVC / MPEG-4 AVC + fields := strings.Fields(line) + if len(fields) >= 2 && fields[1] == encoder { + return true + } + } + return false +} diff --git a/internal/streaming/stream.go b/internal/streaming/stream.go new file mode 100644 index 0000000..67d956e --- /dev/null +++ b/internal/streaming/stream.go @@ -0,0 +1,131 @@ +package streaming + +import ( + "context" + "errors" + "fmt" + "io" + "os/exec" + "sync" + + "github.com/torrentclaw/unarr/internal/library/mediainfo" +) + +// Transcoder owns the resolved ffmpeg / ffprobe binaries plus the +// detected hardware accelerator. One per process; safe for concurrent use. +type Transcoder struct { + ffmpegPath string + ffprobePath string + + hwOnce sync.Once + hw HWAccel +} + +// NewTranscoder constructs a Transcoder from explicit binary paths. +// Both must be non-empty; resolve them upstream via +// mediainfo.ResolveFFmpeg / ResolveFFprobe. +func NewTranscoder(ffmpegPath, ffprobePath string) (*Transcoder, error) { + if ffmpegPath == "" { + return nil, errors.New("streaming: ffmpeg path is required") + } + if ffprobePath == "" { + return nil, errors.New("streaming: ffprobe path is required") + } + return &Transcoder{ + ffmpegPath: ffmpegPath, + ffprobePath: ffprobePath, + }, nil +} + +// HWAccel returns the cached / detected hardware accelerator. First call +// runs `ffmpeg -encoders`; subsequent calls reuse the result. +func (t *Transcoder) HWAccel(ctx context.Context) HWAccel { + t.hwOnce.Do(func() { + t.hw = DetectHWAccel(ctx, t.ffmpegPath) + }) + return t.hw +} + +// Analyze runs ffprobe on the input file and returns a compatibility +// report so the caller can decide direct play vs transcode. +func (t *Transcoder) Analyze(ctx context.Context, inputPath string) (CompatibilityReport, *mediainfo.MediaInfo, error) { + info, err := mediainfo.ExtractMediaInfo(ctx, t.ffprobePath, inputPath) + if err != nil { + return CompatibilityReport{}, nil, fmt.Errorf("streaming: ffprobe failed: %w", err) + } + return AnalyzeCompatibility(info), info, nil +} + +// Stream runs ffmpeg with the right recipe for the given file + options +// and writes fragmented MP4 to dst. Blocks until ffmpeg exits or the +// context is cancelled. If ffmpeg's stderr captures something useful, it's +// included in the returned error. +func (t *Transcoder) Stream(ctx context.Context, inputPath string, dst io.Writer, opts StreamOptions) error { + report, _, err := t.Analyze(ctx, inputPath) + if err != nil { + return err + } + return t.StreamWithReport(ctx, inputPath, dst, opts, report) +} + +// StreamWithReport is the lower-level entry point — accepts a +// pre-computed CompatibilityReport so the API handler can inspect the +// decision before kicking off a transcode (useful for billing / +// telemetry / quality-fallback policies). +func (t *Transcoder) StreamWithReport( + ctx context.Context, + inputPath string, + dst io.Writer, + opts StreamOptions, + report CompatibilityReport, +) error { + if opts.HW == HWAccelUnset { + opts.HW = t.HWAccel(ctx) + } + + args := BuildFFmpegArgs(inputPath, report, opts) + cmd := exec.CommandContext(ctx, t.ffmpegPath, args...) + cmd.Stdout = dst + + stderrBuf := newCappedBuffer(8 * 1024) // last 8 KiB is plenty for diagnosing + cmd.Stderr = stderrBuf + + if err := cmd.Run(); err != nil { + // Cancellation looks like an exec error too; surface the cause + // so callers don't blame ffmpeg for client disconnects. + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + return fmt.Errorf("streaming: ffmpeg exited: %w (stderr tail: %s)", err, stderrBuf.String()) + } + return nil +} + +// cappedBuffer is an io.Writer that keeps only the last `cap` bytes +// written. Used to capture ffmpeg's tail stderr for error reporting +// without unbounded memory growth on long transcodes. +type cappedBuffer struct { + buf []byte + cap int +} + +func newCappedBuffer(cap int) *cappedBuffer { + return &cappedBuffer{cap: cap} +} + +func (c *cappedBuffer) Write(p []byte) (int, error) { + if len(p) >= c.cap { + c.buf = append(c.buf[:0], p[len(p)-c.cap:]...) + return len(p), nil + } + if len(c.buf)+len(p) > c.cap { + drop := len(c.buf) + len(p) - c.cap + c.buf = c.buf[drop:] + } + c.buf = append(c.buf, p...) + return len(p), nil +} + +func (c *cappedBuffer) String() string { + return string(c.buf) +} diff --git a/internal/streaming/transcoder.go b/internal/streaming/transcoder.go new file mode 100644 index 0000000..8daa786 --- /dev/null +++ b/internal/streaming/transcoder.go @@ -0,0 +1,135 @@ +// Package streaming wraps ffmpeg for the WebRTC-streaming pipeline. +// +// The browser-side reproductor lives on torrentclaw.com and consumes +// fragmented MP4 (fMP4) chunks via Media Source Extensions (MSE). MSE is +// strict about codecs: H.264 / VP8 / VP9 / AV1 video + AAC / Opus / MP3 +// audio + MP4 / WebM container. Anything else (HEVC/x265, MKV, EAC3, FLAC, +// 10-bit H.264, …) needs transcoding. +// +// The transcoder picks one of two paths per request: +// +// - Direct play — input is already MSE-compatible. Container is remuxed +// to fragmented MP4 with the audio + video streams copied. Cheap: +// ~no CPU, ~no memory. +// +// - Transcode — input is incompatible. Re-encode video to H.264 +// (libx264 sw / h264_nvenc / h264_qsv / h264_vaapi / h264_videotoolbox +// depending on what the host supports) and audio to AAC. Expensive: +// 1× core for 1080p sw, ~free with HW accel. +package streaming + +import ( + "github.com/torrentclaw/unarr/internal/library/mediainfo" +) + +// browserVideoCodecs lists video codecs the player can render natively +// without transcoding. Names match ffprobe's `codec_name`. +var browserVideoCodecs = map[string]struct{}{ + "h264": {}, + "vp8": {}, + "vp9": {}, + "av1": {}, +} + +// browserAudioCodecs lists audio codecs the player accepts natively. +var browserAudioCodecs = map[string]struct{}{ + "aac": {}, + "opus": {}, + "mp3": {}, +} + +// browserPixelFormats lists pixel formats MSE H.264 reliably decodes +// in-browser. 10-bit / 12-bit profiles are rejected because Safari + most +// Chromium versions software-decode them at 1-2 fps. +var browserPixelFormats = map[string]struct{}{ + "yuv420p": {}, + "yuvj420p": {}, +} + +// CompatibilityReport explains why a file is or isn't direct-playable. +// Returned by AnalyzeCompatibility so the caller can show actionable +// feedback (e.g. "transcoding video: HEVC → H.264"). +type CompatibilityReport struct { + DirectPlay bool + VideoCompat bool + AudioCompat bool + Container string // input container hint (best effort) + VideoCodec string + AudioCodec string + PixelFormat string + BitDepth int + Reasons []string // human-readable list of mismatches; empty when DirectPlay +} + +// AnalyzeCompatibility inspects a parsed mediainfo and decides whether the +// stream needs transcoding. It does NOT touch disk or run ffmpeg. +// +// Direct play requires ALL of: +// - Video codec ∈ {h264, vp8, vp9, av1} +// - Pixel format ∈ {yuv420p, yuvj420p} +// - Bit depth ≤ 8 +// - Audio codec ∈ {aac, opus, mp3} +// +// First audio track wins for the compatibility decision; later tracks are +// repacked along with it. Container is intentionally ignored — even MKV +// carrying H.264 + AAC can be remuxed to fMP4 cheaply, so it's not worth +// failing direct-play on container alone. +func AnalyzeCompatibility(info *mediainfo.MediaInfo) CompatibilityReport { + r := CompatibilityReport{} + if info == nil || info.Video == nil { + r.Reasons = append(r.Reasons, "missing video stream metadata") + return r + } + + r.VideoCodec = info.Video.Codec + r.PixelFormat = pixelFormatFor(info.Video) + r.BitDepth = info.Video.BitDepth + + _, vcOK := browserVideoCodecs[r.VideoCodec] + r.VideoCompat = vcOK + if !vcOK { + r.Reasons = append(r.Reasons, + "video codec "+r.VideoCodec+" not playable in browser") + } + if r.BitDepth > 8 { + r.VideoCompat = false + r.Reasons = append(r.Reasons, "video bit depth >8 (HDR / 10-bit)") + } + if r.PixelFormat != "" { + if _, ok := browserPixelFormats[r.PixelFormat]; !ok { + r.VideoCompat = false + r.Reasons = append(r.Reasons, + "pixel format "+r.PixelFormat+" not playable in browser") + } + } + + if len(info.Audio) > 0 { + r.AudioCodec = info.Audio[0].Codec + _, acOK := browserAudioCodecs[r.AudioCodec] + r.AudioCompat = acOK + if !acOK { + r.Reasons = append(r.Reasons, + "audio codec "+r.AudioCodec+" not playable in browser") + } + } else { + // No audio track — direct play allowed for video-only streams. + r.AudioCompat = true + } + + r.DirectPlay = r.VideoCompat && r.AudioCompat + return r +} + +// pixelFormatFor returns a best-effort pixel format string for a VideoInfo. +// mediainfo doesn't carry pix_fmt explicitly today, so we infer from the +// HDR flag: HDR streams are 10-bit yuv420p10le (incompatible by definition) +// while everything else is assumed yuv420p. +// +// Once mediainfo grows a PixFmt field we replace this heuristic with the +// raw value. +func pixelFormatFor(v *mediainfo.VideoInfo) string { + if v.HDR != "" || v.BitDepth >= 10 { + return "yuv420p10le" + } + return "yuv420p" +} diff --git a/internal/streaming/transcoder_test.go b/internal/streaming/transcoder_test.go new file mode 100644 index 0000000..42d4979 --- /dev/null +++ b/internal/streaming/transcoder_test.go @@ -0,0 +1,267 @@ +package streaming + +import ( + "strings" + "testing" + "time" + + "github.com/torrentclaw/unarr/internal/library/mediainfo" +) + +// AnalyzeCompatibility — direct play happy paths. +func TestAnalyzeCompatibility_DirectPlayH264AAC(t *testing.T) { + info := &mediainfo.MediaInfo{ + Video: &mediainfo.VideoInfo{Codec: "h264", BitDepth: 8}, + Audio: []mediainfo.AudioTrack{{Codec: "aac", Channels: 2}}, + } + r := AnalyzeCompatibility(info) + if !r.DirectPlay { + t.Fatalf("h264+aac must be direct-playable, got %+v", r) + } + if len(r.Reasons) != 0 { + t.Fatalf("direct play should have no reasons, got %v", r.Reasons) + } +} + +func TestAnalyzeCompatibility_DirectPlayVideoOnly(t *testing.T) { + info := &mediainfo.MediaInfo{ + Video: &mediainfo.VideoInfo{Codec: "vp9", BitDepth: 8}, + } + r := AnalyzeCompatibility(info) + if !r.DirectPlay { + t.Fatalf("video-only vp9 must be direct-playable, got %+v", r) + } +} + +// AnalyzeCompatibility — transcode required. +func TestAnalyzeCompatibility_TranscodeHEVC(t *testing.T) { + info := &mediainfo.MediaInfo{ + Video: &mediainfo.VideoInfo{Codec: "hevc", BitDepth: 8}, + Audio: []mediainfo.AudioTrack{{Codec: "aac"}}, + } + r := AnalyzeCompatibility(info) + if r.DirectPlay { + t.Fatalf("HEVC must NOT be direct-playable") + } + if !strings.Contains(strings.Join(r.Reasons, ";"), "hevc") { + t.Fatalf("expected reason mentioning hevc, got %v", r.Reasons) + } +} + +func TestAnalyzeCompatibility_TranscodeHDR10bit(t *testing.T) { + info := &mediainfo.MediaInfo{ + Video: &mediainfo.VideoInfo{Codec: "h264", BitDepth: 10, HDR: "HDR10"}, + Audio: []mediainfo.AudioTrack{{Codec: "aac"}}, + } + r := AnalyzeCompatibility(info) + if r.DirectPlay { + t.Fatalf("10-bit HDR10 must NOT be direct-playable") + } +} + +func TestAnalyzeCompatibility_TranscodeEAC3Audio(t *testing.T) { + info := &mediainfo.MediaInfo{ + Video: &mediainfo.VideoInfo{Codec: "h264", BitDepth: 8}, + Audio: []mediainfo.AudioTrack{{Codec: "eac3", Channels: 6}}, + } + r := AnalyzeCompatibility(info) + if r.DirectPlay { + t.Fatalf("EAC3 audio must trigger transcode") + } + if r.VideoCompat != true { + t.Fatalf("video stayed h264 — VideoCompat should still be true; got %+v", r) + } +} + +func TestAnalyzeCompatibility_NilGuard(t *testing.T) { + r := AnalyzeCompatibility(nil) + if r.DirectPlay { + t.Fatal("nil MediaInfo must not be direct-playable") + } + r2 := AnalyzeCompatibility(&mediainfo.MediaInfo{Video: nil}) + if r2.DirectPlay { + t.Fatal("MediaInfo without video must not be direct-playable") + } +} + +// ResolveQuality — fallback + table lookup. +func TestResolveQuality_FallbackTo1080p(t *testing.T) { + got := ResolveQuality("") + if got.Label != "1080p" { + t.Fatalf("empty label fallback wrong: %s", got.Label) + } + got = ResolveQuality("garbage") + if got.Label != "1080p" { + t.Fatalf("unknown label fallback wrong: %s", got.Label) + } +} + +func TestResolveQuality_KnownLabels(t *testing.T) { + cases := map[string]int{ + "480p": 480, + "720p": 720, + "1080p": 1080, + "2160p": 2160, + } + for label, height := range cases { + got := ResolveQuality(label) + if got.MaxHeight != height { + t.Errorf("ResolveQuality(%q).MaxHeight = %d want %d", label, got.MaxHeight, height) + } + } +} + +// BuildFFmpegArgs — recipe shape verified by argv content. +func TestBuildFFmpegArgs_DirectPlayUsesCopy(t *testing.T) { + report := CompatibilityReport{DirectPlay: true, VideoCompat: true, AudioCompat: true} + args := BuildFFmpegArgs("/tmp/movie.mp4", report, StreamOptions{}) + joined := strings.Join(args, " ") + + want := []string{"-i /tmp/movie.mp4", "-c copy", "-movflags " + fragmentedMP4Movflags, "-f mp4", "pipe:1"} + for _, w := range want { + if !strings.Contains(joined, w) { + t.Fatalf("direct-play argv missing %q\n got: %s", w, joined) + } + } + if strings.Contains(joined, "libx264") { + t.Fatalf("direct-play must NOT invoke libx264, got: %s", joined) + } +} + +func TestBuildFFmpegArgs_TranscodeUsesLibx264(t *testing.T) { + report := CompatibilityReport{DirectPlay: false, VideoCompat: false, AudioCompat: true} + args := BuildFFmpegArgs("/tmp/m.mkv", report, StreamOptions{Quality: "720p"}) + joined := strings.Join(args, " ") + + want := []string{ + "-c:v libx264", + "scale=-2:720", + "-b:v 3500000", + "-c:a aac", + "-b:a 128000", + "-pix_fmt yuv420p", + "-preset veryfast", + } + for _, w := range want { + if !strings.Contains(joined, w) { + t.Fatalf("720p transcode argv missing %q\n got: %s", w, joined) + } + } +} + +func TestBuildFFmpegArgs_NVENCSwapsEncoder(t *testing.T) { + report := CompatibilityReport{DirectPlay: false} + args := BuildFFmpegArgs("/tmp/m.mkv", report, StreamOptions{HW: HWAccelNVENC}) + joined := strings.Join(args, " ") + + if !strings.Contains(joined, "-c:v h264_nvenc") { + t.Fatalf("NVENC must use h264_nvenc, got: %s", joined) + } + if strings.Contains(joined, "-preset veryfast") { + t.Fatalf("HW accel skips libx264 preset, got: %s", joined) + } +} + +func TestBuildFFmpegArgs_VAAPIInjectsHwaccelDecoder(t *testing.T) { + report := CompatibilityReport{DirectPlay: false} + args := BuildFFmpegArgs("/tmp/m.mkv", report, StreamOptions{HW: HWAccelVAAPI}) + joined := strings.Join(args, " ") + + if !strings.Contains(joined, "-hwaccel vaapi") { + t.Fatalf("VAAPI must add -hwaccel vaapi, got: %s", joined) + } + if !strings.Contains(joined, "scale_vaapi") { + t.Fatalf("VAAPI must use scale_vaapi filter, got: %s", joined) + } +} + +func TestBuildFFmpegArgs_StartOffsetEmitsSS(t *testing.T) { + report := CompatibilityReport{DirectPlay: true} + args := BuildFFmpegArgs("/tmp/m.mp4", report, StreamOptions{StartOffset: 65*time.Second + 500*time.Millisecond}) + joined := strings.Join(args, " ") + + if !strings.Contains(joined, "-ss 00:01:05.500") { + t.Fatalf("expected -ss 00:01:05.500, got: %s", joined) + } +} + +// HWAccel encoders. +func TestHWAccel_VideoEncoder(t *testing.T) { + cases := map[HWAccel]string{ + HWAccelNone: "libx264", + HWAccelUnset: "libx264", + HWAccelNVENC: "h264_nvenc", + HWAccelQSV: "h264_qsv", + HWAccelVAAPI: "h264_vaapi", + HWAccelVideoToolbox: "h264_videotoolbox", + } + for hw, want := range cases { + if got := hw.VideoEncoder(); got != want { + t.Errorf("%s.VideoEncoder() = %q want %q", hw, got, want) + } + } +} + +func TestHWAccel_OnlyVAAPIHasDecoder(t *testing.T) { + for _, h := range []HWAccel{HWAccelNone, HWAccelNVENC, HWAccelQSV, HWAccelVideoToolbox} { + if h.HasDecoder() { + t.Errorf("%s shouldn't claim HW decoder", h) + } + } + if !HWAccelVAAPI.HasDecoder() { + t.Error("VAAPI should claim HW decoder") + } +} + +// formatDuration — boundary cases. +func TestFormatDuration(t *testing.T) { + cases := []struct { + in time.Duration + want string + }{ + {0, "00:00:00.000"}, + {500 * time.Millisecond, "00:00:00.500"}, + {65 * time.Second, "00:01:05.000"}, + {2*time.Hour + 3*time.Minute + 7*time.Second + 250*time.Millisecond, "02:03:07.250"}, + {-time.Second, "00:00:00.000"}, + } + for _, c := range cases { + if got := formatDuration(c.in); got != c.want { + t.Errorf("formatDuration(%v) = %q want %q", c.in, got, c.want) + } + } +} + +// cappedBuffer — overflow keeps only the tail. +func TestCappedBuffer_KeepsTail(t *testing.T) { + b := newCappedBuffer(10) + b.Write([]byte("hello ")) + b.Write([]byte("world")) + b.Write([]byte("!")) + // "hello " + "world" + "!" = 12 bytes; cap 10 → keep last 10 = "llo world!". + got := b.String() + if got != "llo world!" { + t.Fatalf("unexpected tail %q", got) + } +} + +func TestCappedBuffer_LargeSingleWrite(t *testing.T) { + b := newCappedBuffer(5) + b.Write([]byte("abcdefghij")) + if got := b.String(); got != "fghij" { + t.Fatalf("large write tail wrong: %q", got) + } +} + +// NewTranscoder rejects empty paths. +func TestNewTranscoder_RequiresBothBinaries(t *testing.T) { + if _, err := NewTranscoder("", "/usr/bin/ffprobe"); err == nil { + t.Error("expected error for empty ffmpeg path") + } + if _, err := NewTranscoder("/usr/bin/ffmpeg", ""); err == nil { + t.Error("expected error for empty ffprobe path") + } + if _, err := NewTranscoder("/usr/bin/ffmpeg", "/usr/bin/ffprobe"); err != nil { + t.Errorf("valid paths should not error: %v", err) + } +} From c2e992516259bd069ea25dc47104c11a2681be9e Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 6 May 2026 11:35:52 +0200 Subject: [PATCH 07/88] test(streaming): integration tests with real ffmpeg (skipped without it) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three end-to-end checks that the transcoder actually produces playable output, not just plausible argv. Skip cleanly on hosts without ffmpeg on PATH so unit-test CI keeps working. - TestTranscoder_DirectPlayProducesH264 — synth h264+aac MP4 via `ffmpeg -f lavfi testsrc/sine`, run Analyze (expect direct play), Stream to disk, ffprobe the result, assert codecs are still h264+aac. - TestTranscoder_TranscodeHEVCToH264 — synth hevc+ac3 MKV, expect transcode decision, Stream to memory, ffprobe-verify the output is h264+aac. Skipped if libx265 isn't compiled in. - TestTranscoder_AnalyzeReportsRealMediaInfo — sanity check that Analyze returns a usable mediainfo (320x240, ~2s duration) the API handler can show to the player. Verified locally: PASS: TestTranscoder_DirectPlayProducesH264 (0.09s) PASS: TestTranscoder_TranscodeHEVCToH264 (0.22s) PASS: TestTranscoder_AnalyzeReportsRealMediaInfo (0.06s) --- internal/streaming/integration_test.go | 204 +++++++++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 internal/streaming/integration_test.go diff --git a/internal/streaming/integration_test.go b/internal/streaming/integration_test.go new file mode 100644 index 0000000..2cd0b21 --- /dev/null +++ b/internal/streaming/integration_test.go @@ -0,0 +1,204 @@ +package streaming + +import ( + "bytes" + "context" + "encoding/json" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/torrentclaw/unarr/internal/library/mediainfo" +) + +// These tests need a real ffmpeg + ffprobe on PATH. They're skipped on +// CI runners that lack them — the unit tests already pin the recipes +// deterministically. Run locally when changing the transcoder pipeline. + +func resolveBins(t *testing.T) (string, string) { + t.Helper() + ffmpeg, err := exec.LookPath("ffmpeg") + if err != nil { + t.Skip("ffmpeg not on PATH — skipping integration test") + } + ffprobe, err := exec.LookPath("ffprobe") + if err != nil { + t.Skip("ffprobe not on PATH — skipping integration test") + } + return ffmpeg, ffprobe +} + +// generateTestVideo synthesises a short MP4 for the transcoder to chew on. +// vcodec/acodec let us exercise both direct-play and transcode branches. +func generateTestVideo(t *testing.T, ffmpeg, dir, vcodec, acodec, container string) string { + t.Helper() + out := filepath.Join(dir, "sample."+container) + args := []string{ + "-hide_banner", "-loglevel", "error", "-y", + "-f", "lavfi", "-i", "testsrc=duration=2:size=320x240:rate=15", + "-f", "lavfi", "-i", "sine=frequency=440:duration=2", + "-c:v", vcodec, + } + // libx265 needs at least one keyframe; 2s @ 15fps is fine. + if vcodec == "libx265" { + args = append(args, "-x265-params", "log-level=error") + } + args = append(args, "-c:a", acodec, "-shortest", out) + cmd := exec.Command(ffmpeg, args...) + if buf, err := cmd.CombinedOutput(); err != nil { + t.Skipf("could not synthesise test video (%s/%s/%s): %v\n%s", + vcodec, acodec, container, err, buf) + } + return out +} + +// probeOutput uses ffprobe to inspect the (synthesised) transcoder output +// and returns video + audio codec names. +func probeOutput(t *testing.T, ffprobe, path string) (string, string) { + t.Helper() + cmd := exec.Command(ffprobe, + "-hide_banner", "-loglevel", "error", + "-print_format", "json", "-show_streams", path) + buf, err := cmd.Output() + if err != nil { + t.Fatalf("ffprobe %s: %v", path, err) + } + var data struct { + Streams []struct { + CodecType string `json:"codec_type"` + CodecName string `json:"codec_name"` + } `json:"streams"` + } + if err := json.Unmarshal(buf, &data); err != nil { + t.Fatalf("ffprobe parse: %v", err) + } + var v, a string + for _, s := range data.Streams { + switch s.CodecType { + case "video": + v = s.CodecName + case "audio": + a = s.CodecName + } + } + return v, a +} + +// TestTranscoder_DirectPlayProducesH264 — H.264 + AAC source → direct play +// → output keeps both codecs, just remuxed to fMP4. +func TestTranscoder_DirectPlayProducesH264(t *testing.T) { + ffmpeg, ffprobe := resolveBins(t) + dir := t.TempDir() + src := generateTestVideo(t, ffmpeg, dir, "libx264", "aac", "mp4") + + tr, err := NewTranscoder(ffmpeg, ffprobe) + if err != nil { + t.Fatalf("NewTranscoder: %v", err) + } + + report, _, err := tr.Analyze(context.Background(), src) + if err != nil { + t.Fatalf("Analyze: %v", err) + } + if !report.DirectPlay { + t.Fatalf("h264+aac sample should be direct-playable, got %+v", report) + } + + out := filepath.Join(dir, "out.mp4") + f, err := os.Create(out) + if err != nil { + t.Fatalf("create out: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := tr.Stream(ctx, src, f, StreamOptions{HW: HWAccelNone}); err != nil { + f.Close() + t.Fatalf("Stream: %v", err) + } + f.Close() + + v, a := probeOutput(t, ffprobe, out) + if v != "h264" { + t.Fatalf("direct-play output video codec = %q want h264", v) + } + if a != "aac" { + t.Fatalf("direct-play output audio codec = %q want aac", a) + } +} + +// TestTranscoder_TranscodeHEVCToH264 — HEVC source → transcode → +// output is H.264 + AAC ready for the browser. +func TestTranscoder_TranscodeHEVCToH264(t *testing.T) { + ffmpeg, ffprobe := resolveBins(t) + dir := t.TempDir() + + // Verify libx265 available; some Alpine builds disable it. + if !encoderAvailable(context.Background(), ffmpeg, "libx265") { + t.Skip("ffmpeg lacks libx265 — skipping HEVC transcode integration") + } + src := generateTestVideo(t, ffmpeg, dir, "libx265", "ac3", "mkv") + + tr, err := NewTranscoder(ffmpeg, ffprobe) + if err != nil { + t.Fatalf("NewTranscoder: %v", err) + } + report, _, err := tr.Analyze(context.Background(), src) + if err != nil { + t.Fatalf("Analyze: %v", err) + } + if report.DirectPlay { + t.Fatalf("hevc+ac3 sample must NOT be direct-playable") + } + + var buf bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + if err := tr.Stream(ctx, src, &buf, StreamOptions{Quality: "480p", HW: HWAccelNone}); err != nil { + t.Fatalf("Stream: %v", err) + } + + out := filepath.Join(dir, "transcoded.mp4") + if err := os.WriteFile(out, buf.Bytes(), 0o644); err != nil { + t.Fatalf("persist transcode: %v", err) + } + + v, a := probeOutput(t, ffprobe, out) + if v != "h264" { + t.Fatalf("transcoded video codec = %q want h264", v) + } + if a != "aac" { + t.Fatalf("transcoded audio codec = %q want aac", a) + } +} + +// TestTranscoder_AnalyzeReportsRealMediaInfo validates that the Transcoder +// returns a usable MediaInfo on top of the report — the API handler will +// surface duration / resolution to the player UI. +func TestTranscoder_AnalyzeReportsRealMediaInfo(t *testing.T) { + ffmpeg, ffprobe := resolveBins(t) + dir := t.TempDir() + src := generateTestVideo(t, ffmpeg, dir, "libx264", "aac", "mp4") + + tr, err := NewTranscoder(ffmpeg, ffprobe) + if err != nil { + t.Fatalf("NewTranscoder: %v", err) + } + _, info, err := tr.Analyze(context.Background(), src) + if err != nil { + t.Fatalf("Analyze: %v", err) + } + if info == nil || info.Video == nil { + t.Fatalf("missing parsed mediainfo: %+v", info) + } + if info.Video.Width != 320 || info.Video.Height != 240 { + t.Errorf("dimensions = %dx%d want 320x240", info.Video.Width, info.Video.Height) + } + if info.Video.Duration < 1.5 || info.Video.Duration > 2.5 { + t.Errorf("duration ~2s expected, got %v", info.Video.Duration) + } + // Ensure the package types line up with mediainfo's exported model. + _ = mediainfo.MediaInfo{} +} From 2aeabe6b509b03a55b08e525bf76b717c25706bd Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 6 May 2026 14:46:38 +0200 Subject: [PATCH 08/88] =?UTF-8?q?feat(wstracker-probe):=20-seed=20FILE=20m?= =?UTF-8?q?ode=20for=20browser=20=E2=86=94=20unarr=20e2e=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends the probe binary so it can do more than verify tracker reach: when given a real file, it builds a single-file torrent in memory, seeds it via the WebTorrent peer wire, and prints the magnet URI (with the WSS tracker injected). Useful for proving the end-to-end streaming path before any actual unarr daemon work lands. Internally uses anacrolix/torrent's metainfo.Info.BuildFromFilePath + bencode.Marshal to mint InfoBytes, then AddTorrent → seed loop. Piece length picked from a libtorrent-like ladder (16 KiB → 4 MiB) so the resulting torrent is interoperable with mainstream clients. Validation: synthesised a 5 s 320×240 H.264+AAC mp4 with ffmpeg (`testsrc + sine`), seeded it via this binary against the production wss://tracker.torrentclaw.com endpoint, opened the in-browser player at /stream/. Browser reported `downloaded: 105 KB / 105 KB` and rendered a working