From 992e16ba0596f2cac2a6085f1a56501a7938cd5b Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Sun, 31 May 2026 16:22:14 +0200 Subject: [PATCH] feat(stream): transcode debrid sources to HLS from a URL (hueco #2/2b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Non-browser-native debrid content (mkv/HEVC/…) can now stream: ffmpeg reads the debrid HTTPS link directly (-i ) and transcodes to HLS, instead of 2a's raw direct-play which only works for mp4/m4v. - HLSSessionConfig gains SourceURL + CacheID; sourceRef() feeds ffprobe, ffmpeg -i, and subtitle extraction from one place. HTTP-resilience flags (-reconnect*, -rw_timeout) are added only for a URL source; a seek-restart re-opens the URL with a Range request (-ss before -i = input seek). - Segment cache keys by CacheID (the torrent info_hash) for URL sessions so re-plays hit cache despite the debrid URL changing each resolution (KeyForID, no filepath.Abs). - OnStreamSession: the 2a direct-play branch is now gated on PlayMethod != "hls"; a new branch handles DirectURL + PlayMethod=="hls" → HLS-from-URL. The local-file and both debrid HLS paths share a startHLSPlayback helper. - ExtractMediaInfo no longer masks a URL probe failure as "file not found" (surfaces ffprobe's real stderr, e.g. "Protocol not found" on a TLS-less ffmpeg build). - Bump 0.11.0 -> 0.12.0 as the HLS-from-URL floor the web gates on. Validated e2e against real AllDebrid: a cached HEVC x265 mkv transcodes (h264_nvenc) from the debrid URL and plays 1080p in Chrome via hls.js, subtitles extracted from the remote mkv. --- internal/cmd/daemon.go | 84 ++++++++++++------ internal/cmd/version.go | 2 +- internal/engine/hls.go | 92 +++++++++++++++---- internal/engine/hls_cache.go | 11 ++- internal/engine/hls_url_args_test.go | 122 ++++++++++++++++++++++++++ internal/library/mediainfo/ffprobe.go | 10 ++- 6 files changed, 270 insertions(+), 51 deletions(-) create mode 100644 internal/engine/hls_url_args_test.go diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 61ade43..0561917 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -567,15 +567,36 @@ func runDaemonStart() error { return // already running } + // startHLSPlayback starts an HLS encode (local file or debrid URL) and + // wires it into the StreamServer. Shared by the local-file HLS path and + // the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe + // off the sync loop, and report readiness identically. + startHLSPlayback := func(hlsCfg engine.HLSSessionConfig, hlsCtx context.Context, hlsCancel context.CancelFunc) { + playerSessionRegistry.add(hlsCfg.SessionID, hlsCancel) + go func() { + hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) + if err != nil { + playerSessionRegistry.remove(hlsCfg.SessionID) + hlsCancel() + log.Printf("[hls %s] start failed: %v", agent.ShortID(hlsCfg.SessionID), err) + return + } + streamSrv.HLS().Register(hsess) + go watchSessionReady(hlsCtx, agentClient, hsess, hlsCfg.SessionID) + }() + } + // Debrid direct-play (hueco #2 / 2a): the source has no local file — the // web resolved an HTTPS debrid link (cache-confirmed, browser-native // container) and the daemon streams /stream from it via ranged GETs. // Runs BEFORE the filePath checks (there is no local path) and needs no - // ffmpeg. Provider setup does a HEAD, so hand it off to a goroutine to - // keep the sync loop from blocking other pending actions; register the + // ffmpeg. PlayMethod != "hls" distinguishes this from the debrid + // HLS-from-URL branch below (a non-native container the web wants + // transcoded). Provider setup does a HEAD, so hand it off to a goroutine + // to keep the sync loop from blocking other pending actions; register the // session up front so a duplicate sync within the setup window is a // no-op (matches the HLS branch's handoff rationale). - if sess.DirectURL != "" { + if sess.DirectURL != "" && sess.PlayMethod != "hls" { playerSessionRegistry.add(sess.SessionID, func() { streamSrv.ClearFile() }) go func() { bctx, cancel := context.WithTimeout(ctx, 15*time.Second) @@ -598,6 +619,32 @@ func runDaemonStart() error { return } + // Debrid HLS-from-URL (hueco #2 / 2b): the source is debrid-cached but + // NOT browser-native (mkv/HEVC/…), so the web set playMethod="hls" + // alongside the DirectURL. ffmpeg transcodes straight from the HTTP URL — + // no local file, no torrent. Cache is keyed by info_hash (not the + // per-resolution URL) so a re-play hits the segment cache. + if sess.DirectURL != "" { // playMethod == "hls" implied (2a returned above) + tcRuntime := buildTranscodeRuntime(ctx, cfg) + if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { + log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable (debrid HLS)", agent.ShortID(sess.SessionID)) + return + } + hlsCtx, hlsCancel := context.WithCancel(ctx) + startHLSPlayback(engine.HLSSessionConfig{ + SessionID: sess.SessionID, + SourceURL: sess.DirectURL, + CacheID: sess.InfoHash, + FileName: sess.FileName, + Quality: sess.Quality, + AudioIndex: sess.AudioIndex, + Transcode: tcRuntime, + Cache: hlsCache, + }, hlsCtx, hlsCancel) + log.Printf("[hls %s] debrid HLS-from-URL: %s", agent.ShortID(sess.SessionID), sess.FileName) + return + } + filePath := sess.FilePath if filePath == "" { log.Printf("[hls %s] rejected: empty file path", agent.ShortID(sess.SessionID)) @@ -693,9 +740,12 @@ func runDaemonStart() error { return } + // Local-file HLS (the original path). StartHLSSession runs ffprobe + // (15 s cap) inside startHLSPlayback's goroutine so the sync loop + // returns immediately — browser HEAD probes have a 30 s retry budget + // that absorbs the gap until the playlist registers. hlsCtx, hlsCancel := context.WithCancel(ctx) - playerSessionRegistry.add(sess.SessionID, hlsCancel) - hlsCfg := engine.HLSSessionConfig{ + startHLSPlayback(engine.HLSSessionConfig{ SessionID: sess.SessionID, SourcePath: filePath, FileName: sess.FileName, @@ -703,29 +753,7 @@ func runDaemonStart() error { AudioIndex: sess.AudioIndex, Transcode: tcRuntime, Cache: hlsCache, - } - // StartHLSSession runs ffprobe (15 s cap, typical 0.3–1 s) before - // returning. Doing this synchronously inside the sync handler holds - // the next sync HTTP cycle until ffprobe is done, so any other - // pending actions (new tasks, deletes) wait too. Hand it off so - // the sync loop returns immediately — browser HEAD probes already - // have a 30 s retry budget that absorbs the gap until - // `streamSrv.HLS().Register` lands. - go func() { - hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) - if err != nil { - playerSessionRegistry.remove(sess.SessionID) - hlsCancel() - log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err) - return - } - streamSrv.HLS().Register(hsess) - // Tell the server seg-0 is on disk as soon as it lands so the - // player's SSE subscription flips its "Preparando…" UI without - // waiting for the browser HEAD-probe loop to discover it - // independently. Cache-HIT sessions are ready immediately. - go watchSessionReady(hlsCtx, agentClient, hsess, sess.SessionID) - }() + }, hlsCtx, hlsCancel) } // Periodic DHT node persistence (every 5 min) diff --git a/internal/cmd/version.go b/internal/cmd/version.go index 3614aab..7bd49b1 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.11.0" +var Version = "0.12.0" diff --git a/internal/engine/hls.go b/internal/engine/hls.go index 8e0868a..9d05a30 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -130,8 +130,18 @@ func CleanupHLSOrphanDirs() error { // HLSSessionConfig describes a single browser playback session driven by HLS. type HLSSessionConfig struct { - SessionID string + SessionID string + // Exactly one of SourcePath / SourceURL identifies the input. SourcePath is + // a local file; SourceURL is a remote HTTP(S) URL ffmpeg reads directly + // (hueco #2 / 2b — transcoding a debrid source that isn't browser-native). SourcePath string + // SourceURL, when set, is fed to ffmpeg/ffprobe as the input (-i ) with + // network-resilience flags. Takes priority over SourcePath. + SourceURL string + // CacheID overrides the cache key identity. Empty → key by SourcePath (local + // files). Set to a stable id (the torrent info_hash) for SourceURL sessions + // so re-plays cache-hit even though the debrid URL changes each resolution. + CacheID string FileName string Quality string // "2160p"|"1080p"|"720p"|"480p"|"original"|"" AudioIndex int // 0-based ffmpeg audio stream selection (-map 0:a:N). -1 = default. @@ -143,6 +153,29 @@ type HLSSessionConfig struct { Cache *HLSCache } +// sourceRef returns the ffmpeg/ffprobe input: the remote URL when set, else the +// local path. Used everywhere a `-i` argument or a probe target is needed so +// the local-file and debrid-URL paths share one code path. +func (cfg HLSSessionConfig) sourceRef() string { + if cfg.SourceURL != "" { + return cfg.SourceURL + } + return cfg.SourcePath +} + +// logName is a short, log-friendly source label. For local files it's the base +// name; for a URL source (no SourcePath) it prefers FileName over the raw URL +// (which would leak a query-string token into the logs). +func (cfg HLSSessionConfig) logName() string { + if cfg.SourcePath != "" { + return filepath.Base(cfg.SourcePath) + } + if cfg.FileName != "" { + return cfg.FileName + } + return "debrid-url" +} + // HLSSession owns a tmpdir + ffmpeg subprocess producing HLS fragments. // // Seek behaviour: ffmpeg writes segments sequentially from `ffmpegSegStart`. @@ -298,8 +331,8 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er if !validSessionID.MatchString(cfg.SessionID) { return nil, errors.New("hls: invalid session id") } - if cfg.SourcePath == "" { - return nil, errors.New("hls: empty source path") + if cfg.SourcePath == "" && cfg.SourceURL == "" { + return nil, errors.New("hls: no source (neither path nor URL)") } if cfg.Transcode.FFmpegPath == "" || cfg.Transcode.FFprobePath == "" { return nil, errors.New("hls: ffmpeg/ffprobe not available") @@ -310,7 +343,7 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er // the goroutine that started the session forever and the user would // see the player phase stuck on "Preparando sesión". probeCtx, cancelProbe := context.WithTimeout(ctx, 15*time.Second) - probe, err := ProbeFile(probeCtx, cfg.Transcode.FFprobePath, cfg.SourcePath) + probe, err := ProbeFile(probeCtx, cfg.Transcode.FFprobePath, cfg.sourceRef()) cancelProbe() if err != nil { return nil, fmt.Errorf("hls: probe: %w", err) @@ -334,7 +367,13 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er writerLockHeld bool ) if cfg.Cache != nil { - cacheKey = cfg.Cache.KeyFor(cfg.SourcePath, cfg.Quality, cfg.AudioIndex) + // Debrid URL sessions key by CacheID (info_hash) so re-plays hit cache + // despite the URL changing each resolution; local files key by path. + if cfg.CacheID != "" { + cacheKey = cfg.Cache.KeyForID(cfg.CacheID, cfg.Quality, cfg.AudioIndex) + } else { + cacheKey = cfg.Cache.KeyFor(cfg.SourcePath, cfg.Quality, cfg.AudioIndex) + } // Integrity gate: HasComplete just stats the marker. If init.mp4 or // the last segment vanished (external rm, partial-disk failure), we // can't actually serve a HIT — drop the dir and re-encode. @@ -393,14 +432,14 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er segCount := segmentCountForDuration(probe.DurationSec) s := &HLSSession{ - cfg: cfg, - probe: probe, - tmpDir: tmpDir, - durationSec: probe.DurationSec, - segmentCount: segCount, - startedAt: time.Now(), - lastTouch: time.Now(), - readyCh: make(chan struct{}), + cfg: cfg, + probe: probe, + tmpDir: tmpDir, + durationSec: probe.DurationSec, + segmentCount: segCount, + startedAt: time.Now(), + lastTouch: time.Now(), + readyCh: make(chan struct{}), cache: cfg.Cache, cacheKey: cacheKey, fromCache: fromCache, @@ -420,7 +459,7 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er s.readyCh = nil s.readyMu.Unlock() log.Printf("[hls %s] cache HIT %s: %s, %.1fs, %d segs (quality=%s)", - shortHLSID(cfg.SessionID), cacheKey, filepath.Base(cfg.SourcePath), + shortHLSID(cfg.SessionID), cacheKey, cfg.logName(), probe.DurationSec, segCount, coalesce(cfg.Quality, "auto")) return s, nil } @@ -464,7 +503,7 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er presetNote = " preset=" + profile.Preset } log.Printf("[hls %s] started: %s, %.1fs, %d segs (quality=%s, encoder=%s accel=%s%s)%s", - shortHLSID(cfg.SessionID), filepath.Base(cfg.SourcePath), + shortHLSID(cfg.SessionID), cfg.logName(), probe.DurationSec, segCount, coalesce(cfg.Quality, "auto"), profile.Codec, string(cfg.Transcode.HWAccel), presetNote, cachedNote) return s, nil @@ -1038,8 +1077,8 @@ func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) // (otherwise the two switches in buildHLSFFmpegArgsAt could silently drift // when adding a new backend). type EncoderProfile struct { - Codec string // ffmpeg encoder name (e.g. "h264_nvenc", "libx264") - Preset string // preset string, or "" when the codec has no preset knob + Codec string // ffmpeg encoder name (e.g. "h264_nvenc", "libx264") + Preset string // preset string, or "" when the codec has no preset knob DecodeHwAccel string // ffmpeg `-hwaccel` value (e.g. "cuda", "qsv", "vaapi"), or "" } @@ -1111,7 +1150,22 @@ func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir strin args = append(args, "-ss", strconv.FormatFloat(startSec, 'f', 3, 64)) } - args = append(args, "-i", cfg.SourcePath) + // Remote (debrid) input: make the HTTP read resilient. -reconnect* recovers + // from a dropped/idle connection (debrid CDNs close long-idle sockets); + // -rw_timeout (µs) bounds a stalled read so a hung CDN surfaces as a restart + // instead of a frozen player. A seek (-ss before -i) re-opens the URL with a + // Range request, which debrid supports. Flags are no-ops for local files, so + // only add them for a URL source. + if cfg.SourceURL != "" { + args = append(args, + "-reconnect", "1", + "-reconnect_streamed", "1", + "-reconnect_delay_max", "5", + "-rw_timeout", "30000000", + ) + } + + args = append(args, "-i", cfg.sourceRef()) if startSec > 0 { args = append(args, "-output_ts_offset", strconv.FormatFloat(startSec, 'f', 3, 64)) @@ -1307,7 +1361,7 @@ func (s *HLSSession) extractSubtitles(ctx context.Context) { out := filepath.Join(subsDir, fmt.Sprintf("sub-%d.vtt", i)) args := []string{ "-y", "-hide_banner", "-loglevel", "warning", - "-i", s.cfg.SourcePath, + "-i", s.cfg.sourceRef(), "-map", fmt.Sprintf("0:s:%d?", i), "-c:s", "webvtt", out, diff --git a/internal/engine/hls_cache.go b/internal/engine/hls_cache.go index f1bf918..eba61bb 100644 --- a/internal/engine/hls_cache.go +++ b/internal/engine/hls_cache.go @@ -162,6 +162,16 @@ func (c *HLSCache) KeyFor(sourcePath, quality string, audioIndex int) string { return hex.EncodeToString(h[:8]) // 16 hex chars — collision-safe enough for per-host cache } +// KeyForID derives a cache key from a caller-supplied stable identity instead +// of a filesystem path (hueco #2 / 2b). Used for debrid HLS-from-URL sessions: +// the debrid direct URL is re-resolved per play and would never cache-hit, so +// we key by the torrent info_hash — the same content always maps to the same +// key across plays. NOT run through filepath.Abs (an id/URL is not a path). +func (c *HLSCache) KeyForID(id, quality string, audioIndex int) string { + h := sha256.Sum256([]byte(fmt.Sprintf("%s|%s|%d", id, quality, audioIndex))) + return hex.EncodeToString(h[:8]) +} + // DirFor returns the on-disk directory for a cache key. Caller is responsible // for creating it. func (c *HLSCache) DirFor(key string) string { @@ -407,4 +417,3 @@ func (c *HLSCache) StartSweeper(ctx context.Context, interval time.Duration) { func (c *HLSCache) Invalidate(key string) error { return os.RemoveAll(c.DirFor(key)) } - diff --git a/internal/engine/hls_url_args_test.go b/internal/engine/hls_url_args_test.go new file mode 100644 index 0000000..94c0cb8 --- /dev/null +++ b/internal/engine/hls_url_args_test.go @@ -0,0 +1,122 @@ +package engine + +import ( + "strings" + "testing" +) + +// hueco #2 / 2b — buildHLSFFmpegArgsAt must feed a debrid URL straight to +// ffmpeg's -i with HTTP-resilience flags, and must NOT add those flags for a +// local file. +func TestBuildHLSFFmpegArgsFromURL(t *testing.T) { + const url = "https://cdn.debrid.it/dl/abc/Movie.mkv" + cfg := HLSSessionConfig{ + SessionID: "test", + SourceURL: url, + CacheID: "deadbeef", + Quality: "720p", + Transcode: TranscodeRuntime{ + FFmpegPath: "/usr/bin/ffmpeg", + FFprobePath: "/usr/bin/ffprobe", + HWAccel: HWAccelNone, + }, + } + probe := &StreamProbe{Width: 1920, Height: 1080, DurationSec: 100} + args := buildHLSFFmpegArgsAt(cfg, probe, "/tmp/tmpdir", 0, 0) + got := strings.Join(args, " ") + + for _, want := range []string{ + "-reconnect 1", + "-reconnect_streamed 1", + "-reconnect_delay_max 5", + "-rw_timeout 30000000", + "-i " + url, + } { + if !strings.Contains(got, want) { + t.Errorf("URL argv missing %q\n%s", want, got) + } + } +} + +// A seek (startSec>0) on a URL source must keep BOTH the -ss input seek AND the +// HTTP-resilience flags, so a seek-restart re-opens the URL with a Range request +// instead of re-downloading from zero. (-ss before -i = input seek.) +func TestBuildHLSFFmpegArgsFromURLWithSeek(t *testing.T) { + const url = "https://cdn.debrid.it/dl/abc/Movie.mkv" + cfg := HLSSessionConfig{ + SessionID: "test", + SourceURL: url, + CacheID: "deadbeef", + Quality: "720p", + Transcode: TranscodeRuntime{ + FFmpegPath: "/usr/bin/ffmpeg", + FFprobePath: "/usr/bin/ffprobe", + HWAccel: HWAccelNone, + }, + } + probe := &StreamProbe{Width: 1920, Height: 1080, DurationSec: 100} + got := strings.Join(buildHLSFFmpegArgsAt(cfg, probe, "/tmp/tmpdir", 5, 30), " ") + + for _, want := range []string{ + "-ss 30.000", // input seek before -i + "-reconnect 1", // resilience flags still present on a restart + "-rw_timeout 30000000", + "-i " + url, + "-output_ts_offset 30.000", // PTS shift so the manifest numbering holds + } { + if !strings.Contains(got, want) { + t.Errorf("seek+URL argv missing %q\n%s", want, got) + } + } + // -ss must come before -i (fast input seek, not slow output seek). + if strings.Index(got, "-ss 30.000") > strings.Index(got, "-i "+url) { + t.Errorf("-ss must precede -i for input seek:\n%s", got) + } +} + +func TestBuildHLSFFmpegArgsLocalNoNetworkFlags(t *testing.T) { + cfg := HLSSessionConfig{ + SessionID: "test", + SourcePath: "/tmp/test.mkv", + Quality: "720p", + Transcode: TranscodeRuntime{ + FFmpegPath: "/usr/bin/ffmpeg", + FFprobePath: "/usr/bin/ffprobe", + HWAccel: HWAccelNone, + }, + } + probe := &StreamProbe{Width: 1920, Height: 1080, DurationSec: 100} + got := strings.Join(buildHLSFFmpegArgsAt(cfg, probe, "/tmp/tmpdir", 0, 0), " ") + + if strings.Contains(got, "-reconnect") || strings.Contains(got, "-rw_timeout") { + t.Errorf("local source must not carry HTTP-resilience flags: %s", got) + } + if !strings.Contains(got, "-i /tmp/test.mkv") { + t.Errorf("local argv missing -i /tmp/test.mkv: %s", got) + } +} + +// sourceRef + cache-key identity: a URL session keys by CacheID, a local one by +// path. Guards the "re-plays of the same debrid content hit cache despite the +// URL changing" invariant. +func TestHLSSourceRefAndCacheID(t *testing.T) { + urlCfg := HLSSessionConfig{SourceURL: "https://cdn/x.mkv", CacheID: "hash1"} + if urlCfg.sourceRef() != "https://cdn/x.mkv" { + t.Errorf("sourceRef = %q, want the URL", urlCfg.sourceRef()) + } + localCfg := HLSSessionConfig{SourcePath: "/m/x.mkv"} + if localCfg.sourceRef() != "/m/x.mkv" { + t.Errorf("sourceRef = %q, want the path", localCfg.sourceRef()) + } + + c := &HLSCache{root: "/tmp/cache"} + // Same CacheID + quality + audio → same key regardless of the (volatile) URL. + k1 := c.KeyForID("hash1", "720p", -1) + k2 := c.KeyForID("hash1", "720p", -1) + if k1 != k2 { + t.Errorf("KeyForID not stable: %q != %q", k1, k2) + } + if c.KeyForID("hash2", "720p", -1) == k1 { + t.Error("KeyForID collision across distinct ids") + } +} diff --git a/internal/library/mediainfo/ffprobe.go b/internal/library/mediainfo/ffprobe.go index 5b33979..0118617 100644 --- a/internal/library/mediainfo/ffprobe.go +++ b/internal/library/mediainfo/ffprobe.go @@ -67,8 +67,14 @@ func ExtractMediaInfo(ctx context.Context, ffprobePath, filePath string) (*Media output, err := cmd.Output() if err != nil { - if _, statErr := os.Stat(filePath); statErr != nil { - return nil, fmt.Errorf("ffprobe: file not found: %s", filePath) + // A remote URL (debrid HLS-from-URL, hueco #2/2b) has no local file to + // stat — surface ffprobe's own stderr (e.g. "Protocol not found" when the + // ffmpeg build lacks TLS, or an HTTP error) instead of a misleading + // "file not found". Only treat a genuine local path as possibly-missing. + if !strings.Contains(filePath, "://") { + if _, statErr := os.Stat(filePath); statErr != nil { + return nil, fmt.Errorf("ffprobe: file not found: %s", filePath) + } } return nil, fmt.Errorf("ffprobe failed (file=%s): %s", filePath, stderr.String()) }