unarr/internal/cmd/daemon.go
Deivid Soto b8d2b90370 feat(stream): serve /stream from a debrid HTTPS link (hueco #2/2a)
The daemon can now stream a session straight from a server-resolved debrid
direct URL instead of disk/torrent, delivering the "play instantáneo
cache-fast" promise for cache-confirmed torrents the user never downloaded.

- debridFileProvider: an io.ReadSeekCloser over HTTP Range — network-free
  Seek, lazy GET on Read, reopen-on-seek, a HEAD up front for the size, and
  a URL-derived name so the served Content-Type is video/mp4 (not
  octet-stream) when the web's name lacks an extension.
- OnStreamSession branches on StreamSession.DirectURL before the filePath
  checks (no local path, no ffmpeg), builds the provider in a goroutine
  (HEAD off the sync loop) and marks the session ready.
- Bump 0.10.0 -> 0.11.0 as the debrid-stream floor the web gates on.

Validated e2e against a real AllDebrid account: a cached mp4 plays 1080p in
Chrome through the agent, including the high-offset seek for a non-faststart
file's moov atom. 2b (HLS-from-URL for mkv/HEVC) + 2c (cache-fast preference
+ mid-stream fallback) remain.
2026-05-31 15:49:58 +02:00

1100 lines
38 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cmd
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/fatih/color"
"github.com/spf13/cobra"
"github.com/torrentclaw/unarr/internal/agent"
"github.com/torrentclaw/unarr/internal/config"
"github.com/torrentclaw/unarr/internal/engine"
"github.com/torrentclaw/unarr/internal/funnel"
"github.com/torrentclaw/unarr/internal/library"
"github.com/torrentclaw/unarr/internal/library/mediainfo"
"github.com/torrentclaw/unarr/internal/usenet/download"
"github.com/torrentclaw/unarr/internal/vpn"
)
// newStartCmd creates the top-level `unarr start` command.
func newStartCmd() *cobra.Command {
return &cobra.Command{
Use: "start",
Short: "Start the download daemon (foreground)",
Long: `Start the unarr daemon in the foreground.
Registers with the server, receives download tasks via periodic sync,
and executes them using the configured download method.
Supports torrent, debrid, and usenet downloads concurrently.
The daemon syncs state with the server every 3s when someone is viewing
the web dashboard, or every 60s when idle. Press Ctrl+C to stop
gracefully — active downloads get up to 30 seconds to finish.
Requires: API key, agent ID, and download directory (run 'unarr init' first).
To run as a background service, use 'unarr daemon install' instead.`,
Example: ` unarr start
unarr start --config /path/to/config.toml`,
RunE: func(cmd *cobra.Command, args []string) error {
return runDaemonStart()
},
}
}
// newStopCmd creates the top-level `unarr stop` command.
func newStopCmd() *cobra.Command {
return &cobra.Command{
Use: "stop",
Short: "Stop the running daemon",
Long: `Stop the unarr daemon gracefully.
Reads the daemon PID from the state file and sends a graceful stop signal.
Works regardless of whether the daemon was started in the foreground or as a service.
To stop a service-managed daemon and prevent auto-restart, use 'unarr daemon stop' instead.`,
Example: ` unarr stop`,
RunE: func(cmd *cobra.Command, args []string) error {
return stopDaemonByPID()
},
}
}
// newDaemonCmd creates `unarr daemon` for administrative subcommands.
func newDaemonCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "daemon <command>",
Short: "Manage the daemon as a system service",
Long: `Install, control and inspect the unarr daemon as a system service.
Linux: systemd user service (~/.config/systemd/user/unarr.service)
macOS: launchd agent (~/Library/LaunchAgents/com.torrentclaw.unarr.plist)
Windows: Task Scheduler task (runs at logon)`,
Example: ` unarr daemon install
unarr daemon start
unarr daemon status
unarr daemon logs -f
unarr daemon reload
unarr daemon restart
unarr daemon stop
unarr daemon uninstall`,
}
cmd.AddCommand(
newDaemonInstallCmdReal(),
newDaemonUninstallCmdReal(),
newDaemonStartCmd(),
newDaemonStopCmd(),
newDaemonRestartCmd(),
newDaemonSvcStatusCmd(),
newDaemonLogsCmd(),
newDaemonReloadCmd(),
)
return cmd
}
func runDaemonStart() error {
cfg := loadConfig()
bold := color.New(color.Bold)
// Validate config
if cfg.Auth.APIKey == "" {
return fmt.Errorf("no API key configured — run 'unarr init' first")
}
if cfg.Agent.ID == "" {
return fmt.Errorf("no agent ID — run 'unarr init' first")
}
if cfg.Download.Dir == "" {
return fmt.Errorf("no download directory — run 'unarr init' first")
}
// Validate configured paths are safe
if err := cfg.ValidatePaths(); err != nil {
return fmt.Errorf("unsafe configuration: %w", err)
}
// Ensure download dir exists
if err := os.MkdirAll(cfg.Download.Dir, 0o755); err != nil {
return fmt.Errorf("create download dir: %w", err)
}
// Clean up stale resume files (>7 days old)
resumeDir := filepath.Join(config.DataDir(), "resume")
if removed := download.CleanStaleFiles(resumeDir, 7*24*time.Hour); removed > 0 {
log.Printf("Cleaned %d stale resume file(s)", removed)
}
fmt.Println()
bold.Println(" unarr Daemon")
fmt.Println()
userAgent := "unarr/" + Version
// Probe HW accel + derive a sensible transcode resolution cap. The cap
// is what the web side uses to decide whether the user should pre-empt
// transcoding by downloading a smaller version (4K source on a software
// libx264-only host is the canonical case where pre-download wins).
//
// Use the full diagnostic (encoders + devices + ffmpeg version) instead
// of just the picked backend — the extra fields ride along in the
// register payload so the web "Diagnose transcoder" modal can show *why*
// libx264 was selected on a host with a GPU (e.g. brew's ffmpeg without
// --enable-nvenc). 10 s ceiling so a hung ffmpeg binary can't stall
// startup forever.
ffmpegResolved, _ := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath)
probeCtx, probeCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer probeCancel() // guard against a panic inside DetectHWAccelDiagnostic
hwDiag := engine.DetectHWAccelDiagnostic(probeCtx, ffmpegResolved)
log.Println(hwDiag.LogLine())
hwAccelPick := hwDiag.Pick
maxTranscodeHeight := 1080
if hwAccelPick != engine.HWAccelNone {
maxTranscodeHeight = 2160
}
// Create daemon config
daemonCfg := agent.DaemonConfig{
AgentID: cfg.Agent.ID,
AgentName: cfg.Agent.Name,
Version: Version,
DownloadDir: cfg.Download.Dir,
StreamPort: cfg.Download.StreamPort,
LanIP: engine.LanIP(),
TailscaleIP: engine.TailscaleIP(),
CanDelete: cfg.Library.AllowDelete,
ScanPaths: library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath),
HWAccel: string(hwAccelPick),
MaxTranscodeHeight: maxTranscodeHeight,
FFmpegVersion: hwDiag.FFmpegVersion,
FFmpegPath: hwDiag.FFmpegPath,
HWEncoders: hwDiag.Encoders,
HWDevices: hwDiag.Devices,
AutoUpgrade: cfg.Daemon.AutoUpgradeEnabled(),
}
// Create HTTP client with mirror failover so a `.com` block-out rolls
// over to `.to` / .onion without restarting the daemon.
agentClient := newAgentClientFromConfig(cfg, userAgent)
log.Printf("Transport: HTTP sync → %s (mirrors: %d)", cfg.Auth.APIURL, len(cfg.Auth.Mirrors))
// Create daemon
d := agent.NewDaemon(daemonCfg, agentClient)
// Start SIGUSR1 reload watcher (unix only, no-op on Windows)
startReloadWatcher(&ReloadableConfig{Daemon: d})
// Daemon-scoped context — cancelled on shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Parse speed limits
maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed)
maxUl, _ := config.ParseSpeed(cfg.Download.MaxUploadSpeed)
// Parse torrent timeouts
metaTimeout, _ := time.ParseDuration(cfg.Download.MetadataTimeout)
stallTimeout, _ := time.ParseDuration(cfg.Download.StallTimeout)
// Create progress reporter — only used for stream tasks (handleStreamTask)
// The sync goroutine handles all regular progress reporting.
statusInterval, _ := time.ParseDuration(cfg.Daemon.StatusInterval)
if statusInterval == 0 {
statusInterval = 3 * time.Second
}
reporter := engine.NewProgressReporter(agentClient, statusInterval)
reporter.SetWatchingFunc(func() bool { return d.Watching.Load() })
// Managed-VPN add-on: bring up the in-process WireGuard split-tunnel before
// the torrent client so peer + tracker traffic routes through it. Failure is
// non-fatal — log and download in the clear (better than refusing to run).
var vpnTunnel *vpn.Tunnel
if cfg.Download.VPN.ConfigFile != "" {
// Self-hosted / personal-VPN mode: read a local .conf directly.
raw, rerr := os.ReadFile(cfg.Download.VPN.ConfigFile)
if rerr != nil {
log.Printf("[vpn] could not read config_file %q (%v) — downloading in the clear", cfg.Download.VPN.ConfigFile, rerr)
} else if t, uerr := vpn.Up(string(raw)); uerr != nil {
log.Printf("[vpn] tunnel failed to start from config_file (%v) — downloading in the clear", uerr)
} else {
vpnTunnel = t
defer vpnTunnel.Close()
log.Printf("[vpn] managed VPN active (local config_file) — torrent traffic split-tunnelled through WireGuard")
}
} else if cfg.Download.VPN.Enabled {
apiURL := cfg.Auth.APIURL
if apiURL == "" {
apiURL = "https://torrentclaw.com"
}
fetchCtx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
conf, ferr := vpn.FetchConfig(fetchCtx, apiURL, cfg.Auth.APIKey, "unarr/"+Version, cfg.Agent.ID, false)
cancel()
var fe *vpn.FetchError
switch {
case ferr != nil && errors.As(ferr, &fe) && fe.Code == vpn.ErrSlotOnDevice:
log.Printf("[vpn] the single WireGuard slot is already held by another unarr agent — this one downloads in the clear. To protect this machine too, set up OpenVPN on it (1 agent uses WireGuard, the rest use OpenVPN — up to 10). See https://torrentclaw.com/vpn")
case ferr != nil:
log.Printf("[vpn] could not enable VPN (%v) — downloading in the clear", ferr)
default:
if t, uerr := vpn.Up(conf); uerr != nil {
log.Printf("[vpn] tunnel failed to start (%v) — downloading in the clear", uerr)
} else {
vpnTunnel = t
defer vpnTunnel.Close()
log.Printf("[vpn] managed VPN active — torrent traffic split-tunnelled through WireGuard")
}
}
}
// Record VPN split-tunnel state for `unarr vpn status`.
if vpnTunnel != nil {
mode := "managed"
if cfg.Download.VPN.ConfigFile != "" {
mode = "self-hosted"
}
d.SetVPNState(true, mode, vpnTunnel.Endpoint)
}
// Create torrent downloader
torrentDl, err := engine.NewTorrentDownloader(engine.TorrentConfig{
DataDir: cfg.Download.Dir,
PieceCompletionDir: config.DataDir(), // keep piece-completion DB off NFS/SMB mounts
MetadataTimeout: metaTimeout,
StallTimeout: stallTimeout,
MaxTimeout: 0,
MaxDownloadRate: maxDl,
MaxUploadRate: maxUl,
ListenPort: cfg.Download.ListenPort,
SeedEnabled: false,
VPNTunnel: vpnTunnel,
})
if err != nil {
return fmt.Errorf("create torrent downloader: %w", err)
}
if maxDl > 0 || maxUl > 0 {
dlStr, ulStr := "unlimited", "unlimited"
if maxDl > 0 {
dlStr = formatSpeedLog(maxDl)
}
if maxUl > 0 {
ulStr = formatSpeedLog(maxUl)
}
log.Printf("Speed limits: download=%s upload=%s", dlStr, ulStr)
}
// Create debrid downloader
debridDl := engine.NewDebridDownloader()
// Create download manager
manager := engine.NewManager(engine.ManagerConfig{
MaxConcurrent: cfg.Download.MaxConcurrent,
OutputDir: cfg.Download.Dir,
Notifications: cfg.Notifications.Enabled,
Organize: engine.OrganizeConfig{
Enabled: cfg.Organize.Enabled,
MoviesDir: cfg.Organize.MoviesDir,
TVShowsDir: cfg.Organize.TVShowsDir,
OutputDir: cfg.Download.Dir,
},
}, reporter, torrentDl, debridDl, engine.NewUsenetDownloader(agentClient))
// Create persistent stream server
streamSrv := engine.NewStreamServer(cfg.Download.StreamPort)
streamSrv.SetUPnPEnabled(cfg.Download.EnableUPnP)
streamSrv.SetRequireStreamToken(cfg.Download.RequireStreamToken)
// Report the stream-token signing key ONLY when enforcing, so the web's
// "secret present → mint HLS token" signal accurately means "this agent
// verifies tokens". Reporting it with enforcement off would make the web
// mint HLS path tokens the agent never peels → 404. Set before Register().
if cfg.Download.RequireStreamToken {
d.UpdateStreamSecret(streamSrv.StreamSecretHex())
}
// CORS extras = operator config + dynamic mirror list from /api/mirrors.
// Without the mirror merge, a user playing from `torrentclaw.to` (or any
// future mirror) hits the daemon, gets 200 + body, but no
// `Access-Control-Allow-Origin` → browser drops the response → player
// reports "404 todos los canales". Fetching /api/mirrors at startup
// future-proofs against mirror additions without a CLI rebuild.
corsExtras := append([]string(nil), cfg.Download.CORSExtraOrigins...)
corsExtras = append(corsExtras, mirrorCORSOrigins(ctx, cfg, userAgent)...)
streamSrv.SetCORSAllowedOrigins(corsExtras)
// Reap HLS tmpdirs left over from a previous daemon run before we start
// accepting new sessions. The in-memory registry doesn't survive a
// restart, so without this disk usage grows unbounded across restarts.
if err := engine.CleanupHLSOrphanDirs(); err != nil {
log.Printf("[hls] orphan tmpdir cleanup: %v", err)
}
// Persistent HLS segment cache — survives across sessions so re-plays
// of the same file at the same quality skip ffmpeg entirely. Off when
// hls_cache.enabled = false; size cap from hls_cache.size_gb; path from
// hls_cache.dir (defaults to ~/.cache/unarr/hls-cache).
var hlsCache *engine.HLSCache
if cfg.Download.HLSCache.Enabled {
cacheDir := cfg.Download.HLSCache.Dir
if cacheDir == "" {
if base, err := os.UserCacheDir(); err == nil {
cacheDir = filepath.Join(base, "unarr", "hls-cache")
} else {
cacheDir = filepath.Join(os.TempDir(), "unarr-hls-cache")
}
}
c, err := engine.NewHLSCache(cacheDir, cfg.Download.HLSCache.SizeGB)
if err != nil {
log.Printf("[hls_cache] init failed (%v) — falling back to per-session tmpdirs", err)
} else {
hlsCache = c
hlsCache.StartSweeper(ctx, time.Hour)
log.Printf("[hls_cache] enabled: dir=%s budget=%dGB", cacheDir, cfg.Download.HLSCache.SizeGB)
}
} else {
log.Printf("[hls_cache] disabled by config — every play re-encodes from scratch")
}
if err := streamSrv.Listen(ctx); err != nil {
return fmt.Errorf("start stream server: %w", err)
}
d.UpdateStreamPort(streamSrv.Port())
// CloudFlare Quick Tunnel — needs the ACTUAL listening port (the
// configured port may have been busy and bumped). Spawning here ensures
// cloudflared --url points at the right socket. Failures degrade to
// Tailscale/LAN only; the supervisor keeps the tunnel up across CF's
// periodic rotation + transient cloudflared crashes.
if cfg.Download.Funnel.Enabled {
go superviseFunnel(ctx, d, streamSrv.Port())
}
// Warn at startup if transcode is enabled but ffmpeg/ffprobe are missing.
// HLS sessions get rejected at runtime (see daemon.go ~line 455), but
// surfacing it here gives the operator a chance to install ffmpeg before
// a user hits a confusing "rejected" line in the logs.
if cfg.Download.Transcode.Enabled {
if _, err := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath); err != nil {
log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS")
} else if _, err := mediainfo.ResolveFFprobe(cfg.Library.FFprobePath); err != nil {
log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS")
}
}
// Wire sync client callbacks
sc := d.SyncClient()
sc.GetFreeSlots = manager.FreeSlots
sc.GetTaskStates = manager.TaskStates
d.GetActiveCount = manager.ActiveCount
// Trigger immediate sync when a download slot frees up
manager.OnTaskDone = func() { d.TriggerSync() }
// Wire: sync receives new tasks → submit to manager or handle stream
d.OnTasksClaimed = func(tasks []agent.Task) {
for _, t := range tasks {
if t.Mode == "stream" {
if isStreamingTask(t.ID) {
continue
}
cancelStreamContexts()
streamSrv.ClearFile()
streamCtx, streamCancel := context.WithCancel(ctx) //nolint:gosec // G118: cancel stored in registry
streamRegistry.mu.Lock()
streamRegistry.cancels[t.ID] = streamCancel
streamRegistry.mu.Unlock()
go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv)
} else {
manager.Submit(ctx, t)
}
}
}
// Wire: sync receives control signals → act on manager
d.OnControlAction = func(action, taskID string, deleteFiles bool) {
switch action {
case "cancel":
if deleteFiles {
manager.CancelAndDeleteFiles(taskID)
} else {
manager.CancelTask(taskID)
}
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
}
case "pause":
manager.PauseTask(taskID)
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
}
case "resume":
log.Printf("[%s] resume requested, triggering sync", agent.ShortID(taskID))
d.TriggerSync()
case "stream":
if streamSrv.CurrentTaskID() == taskID {
return
}
task := manager.GetTask(taskID)
if task == nil || task.GetStreamURL() != "" {
return
}
provider, err := torrentDl.GetStreamProvider(taskID)
if err != nil {
log.Printf("[%s] stream failed: %v", agent.ShortID(taskID), err)
return
}
cancelStreamContexts()
streamSrv.SetFile(provider, taskID)
task.SetStreamURL(streamSrv.URLsJSON())
log.Printf("[%s] streaming: %s", agent.ShortID(taskID), provider.FileName())
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
streamRegistry.mu.Lock()
streamRegistry.cancels["watch:"+taskID] = watchCancel
streamRegistry.mu.Unlock()
go engine.NewWatchReporter(agentClient, streamSrv, taskID).Run(watchCtx)
case "stop-stream":
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
}
}
}
// Wire: sync receives file deletion requests from the server
if cfg.Library.AllowDelete && len(daemonCfg.ScanPaths) > 0 {
sc.OnDeleteFiles = func(items []agent.LibraryDeleteRequest) []int {
return library.DeleteFiles(items, daemonCfg.ScanPaths)
}
}
// Wire: sync receives stream requests for completed downloads
d.OnStreamRequested = func(sr agent.StreamRequest) {
if streamSrv.CurrentTaskID() == sr.TaskID {
// Already serving — notify server it's ready
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamReady: true,
}); err != nil {
log.Printf("[%s] stream ready re-notify failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
return
}
filePath := filepath.Clean(sr.FilePath)
if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath,
cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) {
log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath)
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
Status: "failed",
ErrorMessage: fmt.Sprintf("path outside allowed dirs: %s", filePath),
}); err != nil {
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
return
}
info, err := os.Stat(filePath)
if err != nil {
log.Printf("[%s] stream request: file not found: %s", agent.ShortID(sr.TaskID), filePath)
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
Status: "failed",
ErrorMessage: fmt.Sprintf("file not found: %s", filePath),
}); err != nil {
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
return
}
if info.IsDir() {
found := engine.FindVideoFile(filePath)
if found == "" {
log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath)
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
Status: "failed",
ErrorMessage: fmt.Sprintf("no video file in directory: %s", filePath),
}); err != nil {
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
return
}
filePath = found
log.Printf("[%s] resolved directory to video file: %s", agent.ShortID(sr.TaskID), filepath.Base(filePath))
}
cancelStreamContexts()
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID)
log.Printf("[%s] streaming from disk: %s → %s", agent.ShortID(sr.TaskID), filepath.Base(filePath), streamSrv.URL())
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
streamRegistry.mu.Lock()
streamRegistry.cancels["watch:"+sr.TaskID] = watchCancel
streamRegistry.mu.Unlock()
go engine.NewWatchReporter(agentClient, streamSrv, sr.TaskID).Run(watchCtx)
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamReady: true,
}); err != nil {
log.Printf("[%s] stream ready report failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
}
// Wire: sync receives HLS streaming session requests. Each session spawns
// one ffmpeg process and registers its HLS playlist with the StreamServer.
// Validate FilePath against allowed dirs to prevent path traversal abuse
// from a compromised server.
d.OnStreamSession = func(sess agent.StreamSession) {
if playerSessionRegistry.has(sess.SessionID) {
return // already running
}
// Debrid direct-play (hueco #2 / 2a): the source has no local file — the
// web resolved an HTTPS debrid link (cache-confirmed, browser-native
// container) and the daemon streams /stream from it via ranged GETs.
// Runs BEFORE the filePath checks (there is no local path) and needs no
// ffmpeg. Provider setup does a HEAD, so hand it off to a goroutine to
// keep the sync loop from blocking other pending actions; register the
// session up front so a duplicate sync within the setup window is a
// no-op (matches the HLS branch's handoff rationale).
if sess.DirectURL != "" {
playerSessionRegistry.add(sess.SessionID, func() { streamSrv.ClearFile() })
go func() {
bctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
provider, perr := engine.NewDebridFileProvider(bctx, sess.DirectURL, sess.FileName, sess.FileSize)
if perr != nil {
playerSessionRegistry.remove(sess.SessionID)
log.Printf("[stream %s] debrid provider failed: %v", agent.ShortID(sess.SessionID), perr)
return
}
streamSrv.SetFile(provider, sess.TaskID)
log.Printf("[stream %s] debrid direct-play: %s (%d bytes)",
agent.ShortID(sess.SessionID), provider.FileName(), provider.FileSize())
rctx, rcancel := context.WithTimeout(ctx, 10*time.Second)
defer rcancel()
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
}
}()
return
}
filePath := sess.FilePath
if filePath == "" {
log.Printf("[hls %s] rejected: empty file path", agent.ShortID(sess.SessionID))
return
}
filePath = filepath.Clean(filePath)
if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath,
cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) {
log.Printf("[hls %s] rejected: path outside allowed dirs: %s",
agent.ShortID(sess.SessionID), filePath)
return
}
// Resolve directory → first video file (matches StreamRequest behavior).
if info, err := os.Stat(filePath); err == nil && info.IsDir() {
found := engine.FindVideoFile(filePath)
if found == "" {
log.Printf("[hls %s] rejected: no video file in dir %s",
agent.ShortID(sess.SessionID), filePath)
return
}
filePath = found
}
// Direct-play (hueco #3 / 3a): the web decided this source is already
// browser-native (mp4 h264/aac 8-bit SDR) from library scan metadata,
// gated on agent version. Serve the raw file over /stream (HTTP Range,
// no ffmpeg) instead of transcoding to HLS — zero CPU, instant seek.
// Runs BEFORE the ffmpeg-availability check on purpose: direct-play
// needs no ffmpeg, so it must work even when transcode is disabled.
if sess.PlayMethod == "direct" {
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sess.TaskID)
// cancel just clears the served file so daemon shutdown / drain
// stops exposing it on /stream. There's no ffmpeg child to kill.
playerSessionRegistry.add(sess.SessionID, func() { streamSrv.ClearFile() })
log.Printf("[stream %s] direct-play: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath))
// File is on disk → ready immediately. Tell the web so the player
// attaches <video src> without burning its HEAD-probe retry budget.
go func() {
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
}
}()
return
}
tcRuntime := buildTranscodeRuntime(ctx, cfg)
if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" {
log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID))
return
}
// Remux path (hueco #3 / 3b): codecs are browser-native (h264/aac) but
// the container isn't (mkv). ffmpeg `-c copy` → growing fMP4 served raw
// over /stream — no video re-encode, no HLS. The web decided this from
// scan metadata + version gate; we still need ffmpeg (copy uses it).
if sess.PlayMethod == "remux" {
tStart := time.Now()
probeCtx, cancelProbe := context.WithTimeout(ctx, 15*time.Second)
probe, perr := engine.ProbeFile(probeCtx, tcRuntime.FFprobePath, filePath)
cancelProbe()
if perr != nil {
log.Printf("[stream %s] remux probe failed: %v", agent.ShortID(sess.SessionID), perr)
return
}
tProbe := time.Now()
remuxCtx, remuxCancel := context.WithCancel(ctx)
src, serr := engine.NewRemuxSource(remuxCtx, filePath, probe, tcRuntime.FFmpegPath, sess.FileName)
if serr != nil {
remuxCancel()
log.Printf("[stream %s] remux start failed: %v", agent.ShortID(sess.SessionID), serr)
return
}
streamSrv.SetGrowingFile(src, sess.TaskID)
// cancel stops the ffmpeg copy; SetGrowingFile/ClearFile also Close()
// the source, so the temp file is always cleaned up.
playerSessionRegistry.add(sess.SessionID, func() { remuxCancel(); streamSrv.ClearFile() })
// Startup timing (TTFF diagnosis): probe = ffprobe on the source;
// spawn = ffmpeg launch + tmp setup. First-fMP4-byte is logged by the
// source itself; serveGrowing logs any client read that blocks waiting
// for ffmpeg to catch up.
log.Printf("[stream %s] remux (copy) → fMP4: %s [probe=%v spawn=%v]",
agent.ShortID(sess.SessionID), filepath.Base(filePath),
tProbe.Sub(tStart).Round(time.Millisecond), time.Since(tProbe).Round(time.Millisecond))
go func() {
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
}
}()
return
}
hlsCtx, hlsCancel := context.WithCancel(ctx)
playerSessionRegistry.add(sess.SessionID, hlsCancel)
hlsCfg := engine.HLSSessionConfig{
SessionID: sess.SessionID,
SourcePath: filePath,
FileName: sess.FileName,
Quality: sess.Quality,
AudioIndex: sess.AudioIndex,
Transcode: tcRuntime,
Cache: hlsCache,
}
// StartHLSSession runs ffprobe (15 s cap, typical 0.31 s) before
// returning. Doing this synchronously inside the sync handler holds
// the next sync HTTP cycle until ffprobe is done, so any other
// pending actions (new tasks, deletes) wait too. Hand it off so
// the sync loop returns immediately — browser HEAD probes already
// have a 30 s retry budget that absorbs the gap until
// `streamSrv.HLS().Register` lands.
go func() {
hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg)
if err != nil {
playerSessionRegistry.remove(sess.SessionID)
hlsCancel()
log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err)
return
}
streamSrv.HLS().Register(hsess)
// Tell the server seg-0 is on disk as soon as it lands so the
// player's SSE subscription flips its "Preparando…" UI without
// waiting for the browser HEAD-probe loop to discover it
// independently. Cache-HIT sessions are ready immediately.
go watchSessionReady(hlsCtx, agentClient, hsess, sess.SessionID)
}()
}
// Periodic DHT node persistence (every 5 min)
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
torrentDl.SaveDhtNodes()
case <-ctx.Done():
return
}
}
}()
// Periodic HLS session sweeper (every 5 min). Closes sessions whose last
// segment fetch was over 30 min ago — kills the orphan ffmpeg + removes
// the per-session tmpdir, so a tab that died mid-stream doesn't leak
// disk space until daemon shutdown.
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if n := streamSrv.HLS().SweepIdle(); n > 0 {
log.Printf("[hls] swept %d idle session(s)", n)
}
case <-ctx.Done():
return
}
}
}()
// Start auto-scan goroutine
scanPaths := daemonCfg.ScanPaths
if len(scanPaths) > 0 && cfg.Library.AutoScan {
scanInterval := 24 * time.Hour
if cfg.Library.ScanInterval != "" {
if parsed, err := time.ParseDuration(cfg.Library.ScanInterval); err == nil && parsed > 0 {
scanInterval = parsed
}
}
go runAutoScan(ctx, cfg, scanInterval, agentClient, d.ScanNow, scanPaths)
}
// Start reporter only for stream task handling
go reporter.Run(ctx)
// Start daemon (blocks — runs sync loop)
errCh := make(chan error, 1)
go func() {
errCh <- d.Run(ctx)
}()
// Start idle guard for the persistent stream server
go startIdleGuard(ctx, streamSrv)
// Signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Wait for signal or error
select {
case sig := <-sigCh:
fmt.Printf("\n Received %s, shutting down...\n", sig)
cancelStreamContexts()
cancelAllPlayerSessions()
streamSrv.Shutdown(context.Background())
cancel()
// Give active downloads 30s to finish
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
manager.Shutdown(shutdownCtx)
d.Deregister()
fmt.Println(" Daemon stopped.")
return nil
case err := <-errCh:
cancelStreamContexts()
cancelAllPlayerSessions()
streamSrv.Shutdown(context.Background())
cancel()
return err
}
}
// isAllowedStreamPath checks that filePath is within one of the directories
// the daemon is configured to manage. This defends against a compromised API
// server sending a path traversal payload (e.g. /etc/passwd) in StreamRequest.
// isAllowedStreamPath reports whether filePath is contained within one of the
// allowedDirs. filePath must already be cleaned (filepath.Clean) by the caller.
// This defends against a compromised API server sending a path traversal payload.
func isAllowedStreamPath(filePath string, allowedDirs ...string) bool {
for _, dir := range allowedDirs {
if dir == "" {
continue
}
rel, err := filepath.Rel(filepath.Clean(dir), filePath)
if err == nil && !strings.HasPrefix(rel, "..") {
return true
}
}
return false
}
func formatSpeedLog(bps int64) string {
switch {
case bps >= 1024*1024*1024:
return fmt.Sprintf("%.1f GB/s", float64(bps)/(1024*1024*1024))
case bps >= 1024*1024:
return fmt.Sprintf("%.1f MB/s", float64(bps)/(1024*1024))
case bps >= 1024:
return fmt.Sprintf("%.0f KB/s", float64(bps)/1024)
default:
return fmt.Sprintf("%d B/s", bps)
}
}
// runAutoScan runs a library scan + sync on a timer or on-demand via scanNow channel.
// It scans all provided paths and syncs each independently so stale-item cleanup
// is scoped to the correct directory prefix on the server.
func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, ac *agent.Client, scanNow <-chan struct{}, scanPaths []string) {
log.Printf("[auto-scan] enabled: every %s, paths: %v", interval, scanPaths)
select {
case <-time.After(30 * time.Second):
case <-scanNow:
case <-ctx.Done():
return
}
doScan := func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[auto-scan] panic recovered: %v", r)
}
}()
log.Printf("[auto-scan] starting scan of %v", scanPaths)
existing, _ := library.LoadCache()
workers := cfg.Library.Workers
if workers == 0 {
workers = 8
}
scanOpts := library.ScanOptions{
Workers: workers,
FFprobePath: cfg.Library.FFprobePath,
Incremental: existing != nil,
}
// Scan each path independently and sync per path so the server can
// scope stale-item deletion to the correct directory prefix.
const batchSize = 100
totalSynced := 0
var mergedItems []library.LibraryItem
for _, scanPath := range scanPaths {
cache, err := library.Scan(ctx, scanPath, existing, scanOpts)
if err != nil {
log.Printf("[auto-scan] scan failed for %s: %v", scanPath, err)
continue
}
mergedItems = append(mergedItems, cache.Items...)
items := library.BuildSyncItems(cache)
if len(items) == 0 {
log.Printf("[auto-scan] no items under %s", scanPath)
continue
}
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
isLast := end >= len(items)
_, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
Items: items[i:end],
ScanPath: scanPath,
IsLastBatch: isLast,
SyncStartedAt: syncStartedAt,
})
if err != nil {
log.Printf("[auto-scan] sync failed for %s: %v", scanPath, err)
break
}
}
totalSynced += len(items)
}
// Save merged cache for incremental scanning next time.
if len(mergedItems) > 0 {
mergedCache := &library.LibraryCache{
ScannedAt: time.Now().UTC().Format(time.RFC3339),
Path: scanPaths[0],
Items: mergedItems,
}
if err := library.SaveCache(mergedCache); err != nil {
log.Printf("[auto-scan] save cache failed: %v", err)
}
}
log.Printf("[auto-scan] synced %d items across %d path(s)", totalSynced, len(scanPaths))
}
doScan()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
doScan()
case <-scanNow:
log.Printf("[auto-scan] on-demand scan triggered")
ticker.Reset(interval)
doScan()
case <-ctx.Done():
return
}
}
}
// superviseFunnel keeps a CloudFlare Quick Tunnel up across cloudflared
// crashes and CF's ~6h tunnel rotation. On a clean exit (cancellation) it
// returns; on a crash it clears the reported URL and respawns with an
// exponential backoff so we don't hammer cloudflared into a tight loop when
// it can't reach the CF edge.
func superviseFunnel(ctx context.Context, d *agent.Daemon, port int) {
backoff := 2 * time.Second
const maxBackoff = 5 * time.Minute
for ctx.Err() == nil {
t, err := funnel.Start(ctx, funnel.Config{Port: port})
if err != nil {
log.Printf("[funnel] could not start CloudFlare tunnel (%v) — retrying in %s", err, backoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
backoff = min(backoff*2, maxBackoff)
continue
}
log.Printf("[funnel] cloudflared started, waiting for public URL...")
go func() {
url, werr := t.WaitURL(45 * time.Second)
if werr != nil {
log.Printf("[funnel] cloudflared did not emit a URL (%v)", werr)
return
}
log.Printf("[funnel] public URL: %s", url)
d.SetFunnelURL(url)
}()
// Block until cloudflared exits (CF rotation, crash, or shutdown).
exitErr := <-t.Done()
_ = t.Close()
d.SetFunnelURL("")
if ctx.Err() != nil {
return
}
if exitErr != nil {
log.Printf("[funnel] cloudflared exited: %v — restarting in %s", exitErr, backoff)
} else {
log.Printf("[funnel] cloudflared exited cleanly — restarting in %s", backoff)
}
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
backoff = min(backoff*2, maxBackoff)
}
}
// mirrorCORSOrigins fetches /api/mirrors from the configured primary (+ extra
// mirror candidates + static IPFS fallback) and returns the discovered URLs as
// Origin strings. Best-effort: any failure logs a warning and returns an empty
// slice; the static defaultCORSAllowedOrigins in validate.go covers the known
// mirrors (.com / .to / built-in onion) so the daemon still accepts the
// official surfaces when this call fails.
//
// Bounded to a short timeout so a slow /api/mirrors response can't delay
// daemon startup — every second here is a second the user can't play.
func mirrorCORSOrigins(parent context.Context, cfg config.Config, userAgent string) []string {
ctx, cancel := context.WithTimeout(parent, 10*time.Second)
defer cancel()
candidates := append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...)
resp, err := agent.FetchMirrorsWithFallback(ctx, candidates, userAgent)
if err != nil {
log.Printf("[cors] mirror discovery failed (%v) — using static allowlist only", err)
return nil
}
seen := make(map[string]struct{})
out := make([]string, 0, len(resp.Mirrors))
add := func(rawURL string) {
if rawURL == "" {
return
}
origin := strings.TrimRight(rawURL, "/")
if _, dup := seen[origin]; dup {
return
}
seen[origin] = struct{}{}
out = append(out, origin)
}
for _, m := range resp.Mirrors {
add(m.URL)
}
if resp.Tor != nil {
add(resp.Tor.URL)
}
if len(out) > 0 {
log.Printf("[cors] merged %d mirror origins from /api/mirrors", len(out))
}
return out
}
// watchSessionReady polls HLSSession.ReadyCount until the first segment +
// init.mp4 are on disk, then POSTs /api/internal/agent/session-ready so
// the web side flips streaming_session.ready_at — which its SSE endpoint
// pushes to subscribed players. Cache-HIT sessions are ready the moment
// StartHLSSession returns and POST immediately.
//
// Bounded by a 60 s deadline so a permanently stuck encoder doesn't keep
// a goroutine alive forever; if seg-0 never lands the player falls back
// to its existing HEAD-probe retry path anyway.
func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.HLSSession, sessionID string) {
deadline := time.Now().Add(60 * time.Second)
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
// Session torn down through a path that didn't cancel ctx (registry
// replace, idle sweep, internal kill). Bail before polling further —
// without this check the watcher could keep alive for up to 60 s on
// a dead HLSSession that's never going to become ready.
if hsess.IsClosed() {
return
}
// Cache HIT or seg-0 ready → notify + done.
if hsess.FromCache() || hsess.ReadyCount() >= 1 {
// Parent ctx so a session cancel mid-POST (user closed tab,
// daemon shutdown) tears down the in-flight webhook instead of
// blocking the goroutine for up to 10 s on a now-orphan call.
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
if err := client.MarkSessionReady(rctx, sessionID); err != nil {
log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
}
cancel()
return
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if time.Now().After(deadline) {
log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID))
return
}
}
}