diff --git a/internal/agent/client.go b/internal/agent/client.go index e7f2c37..f1014c5 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -130,6 +130,27 @@ func (c *Client) MarkSessionReady(ctx context.Context, sessionID string) error { return nil } +// RefreshStreamURL re-resolves a fresh debrid direct URL for a live streaming +// session (hueco #2 / 2c). Called by the daemon when a debrid source expires +// mid-stream (the link is time-limited; the content is still cached). Returns +// the new URL on success; an error (incl. 409/410) means refresh isn't +// possible and the caller should stop trying. +func (c *Client) RefreshStreamURL(ctx context.Context, sessionID string) (string, error) { + req := struct { + SessionID string `json:"sessionId"` + }{SessionID: sessionID} + var resp struct { + DirectURL string `json:"directUrl"` + } + if err := c.doPost(ctx, "/api/internal/agent/stream-url", req, &resp); err != nil { + return "", fmt.Errorf("refresh stream url: %w", err) + } + if resp.DirectURL == "" { + return "", fmt.Errorf("refresh stream url: empty url in response") + } + return resp.DirectURL, nil +} + // ReportStatus reports download progress. Returns server-side flags the CLI must act on. func (c *Client) ReportStatus(ctx context.Context, update StatusUpdate) (*StatusResponse, error) { var resp StatusResponse diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 0561917..8d31fe7 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -598,10 +598,16 @@ func runDaemonStart() error { // no-op (matches the HLS branch's handoff rationale). if sess.DirectURL != "" && sess.PlayMethod != "hls" { playerSessionRegistry.add(sess.SessionID, func() { streamSrv.ClearFile() }) + // refresh re-resolves a fresh debrid link when this one expires + // mid-stream (hueco #2 / 2c). Bound to the daemon ctx so a shutdown + // cancels an in-flight refresh. + refresh := func(rctx context.Context) (string, error) { + return agentClient.RefreshStreamURL(rctx, sess.SessionID) + } go func() { bctx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() - provider, perr := engine.NewDebridFileProvider(bctx, sess.DirectURL, sess.FileName, sess.FileSize) + provider, perr := engine.NewDebridFileProvider(bctx, sess.DirectURL, sess.FileName, sess.FileSize, refresh) if perr != nil { playerSessionRegistry.remove(sess.SessionID) log.Printf("[stream %s] debrid provider failed: %v", agent.ShortID(sess.SessionID), perr) @@ -640,6 +646,11 @@ func runDaemonStart() error { AudioIndex: sess.AudioIndex, Transcode: tcRuntime, Cache: hlsCache, + // 2c: refresh the debrid link if it expires mid-transcode; the + // auto-restart supervisor calls this before relaunching ffmpeg. + RefreshURL: func(rctx context.Context) (string, error) { + return agentClient.RefreshStreamURL(rctx, sess.SessionID) + }, }, hlsCtx, hlsCancel) log.Printf("[hls %s] debrid HLS-from-URL: %s", agent.ShortID(sess.SessionID), sess.FileName) return diff --git a/internal/engine/hls.go b/internal/engine/hls.go index 9d05a30..12bad0e 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -141,7 +141,12 @@ type HLSSessionConfig struct { // CacheID overrides the cache key identity. Empty → key by SourcePath (local // files). Set to a stable id (the torrent info_hash) for SourceURL sessions // so re-plays cache-hit even though the debrid URL changes each resolution. - CacheID string + CacheID string + // RefreshURL, when set (debrid URL sessions only), re-resolves a fresh + // SourceURL when the current link expires mid-transcode (hueco #2 / 2c). + // The auto-restart supervisor calls it before relaunching ffmpeg so the + // restart uses a live link instead of retrying the dead one. nil = no refresh. + RefreshURL func(context.Context) (string, error) FileName string Quality string // "2160p"|"1080p"|"720p"|"480p"|"original"|"" AudioIndex int // 0-based ffmpeg audio stream selection (-map 0:a:N). -1 = default. @@ -204,6 +209,13 @@ type HLSSession struct { ffmpegSegStart int // index of the first segment the current ffmpeg writes restartCount int // bounded auto-restart counter (resets on Close) lastRestartAt time.Time + // liveURL is the mutable debrid source URL (hueco #2 / 2c). Initialised to + // cfg.SourceURL; refreshed in place by waitFFmpeg when the link expires. + // Guarded by mu because restartFromSegment reads it from BOTH the supervisor + // goroutine (auto-restart) AND the HTTP handler goroutine (seek-restart), + // while waitFFmpeg writes it. Empty for local-file sessions. cfg itself is + // treated as immutable after construction so copying it stays race-free. + liveURL string // readyCh + readyMax track how many segments ffmpeg has finished writing. // readyMax is a COUNT (not an index): readyMax=N means seg-0 … seg-(N-1) @@ -444,6 +456,7 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er cacheKey: cacheKey, fromCache: fromCache, writerLockHeld: writerLockHeld, + liveURL: cfg.SourceURL, // mutable copy; cfg stays immutable } s.manifestVideo = renderVideoPlaylist(probe.DurationSec, segCount) s.manifestRoot = renderMasterPlaylist(probe, cfg.Quality) @@ -483,9 +496,14 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er if len(probe.SubtitleTracks) > 0 { s.subsDone = make(chan struct{}) + // Capture the source ref now (by value): subs are extracted once at + // startup, and a later URL refresh (2c) mutates s.cfg.SourceURL from the + // waitFFmpeg goroutine — passing the URL in keeps extractSubtitles from + // racing that write. + subSrc := cfg.sourceRef() go func() { defer close(s.subsDone) - s.extractSubtitles(ffCtx) + s.extractSubtitles(ffCtx, subSrc) }() } @@ -750,6 +768,26 @@ func (s *HLSSession) waitFFmpeg() { s.lastRestartAt = time.Now() s.mu.Unlock() + // Debrid URL session (hueco #2 / 2c): the likeliest cause of an ffmpeg + // network exit is the debrid link expiring. Re-resolve a fresh one before + // restarting, else the restart just retries the dead URL and burns the + // retry budget. The network call runs lock-free; the result is stored in + // s.liveURL under s.mu because restartFromSegment reads it from the HTTP + // handler goroutine too (seek-restart), not just this supervisor goroutine. + if s.cfg.SourceURL != "" && s.cfg.RefreshURL != nil { + rctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + newURL, rerr := s.cfg.RefreshURL(rctx) + cancel() + if rerr != nil { + log.Printf("[hls %s] URL refresh before restart failed: %v", shortHLSID(s.cfg.SessionID), rerr) + } else { + s.mu.Lock() + s.liveURL = newURL + s.mu.Unlock() + log.Printf("[hls %s] debrid URL refreshed before restart", shortHLSID(s.cfg.SessionID)) + } + } + // Restart from the last segment we know is safely on disk. If readyMax // is 0 (never produced anything), retry from segment 0 — covers initial // startup failures on transient errors. @@ -1005,8 +1043,15 @@ func (s *HLSSession) restartFromSegment(targetIdx int) error { // Build args for the new ffmpeg with -ss offset. Segments are non-uniform // (seg-0 is hlsInitSegmentDuration s, the rest are hlsSegmentDuration s), // so use segmentStartSec for the seek time instead of multiplying. + // Use a local cfg copy carrying the live (possibly-refreshed) debrid URL, + // read under s.mu — this runs from the HTTP handler goroutine too, so it + // can't read s.liveURL unsynchronised while waitFFmpeg writes it (2c). startSec := segmentStartSec(targetIdx) - args := buildHLSFFmpegArgsAt(s.cfg, s.probe, s.tmpDir, targetIdx, startSec) + cfg := s.cfg + s.mu.Lock() + cfg.SourceURL = s.liveURL // "" for local-file sessions — no-op, sourceRef falls back to SourcePath + s.mu.Unlock() + args := buildHLSFFmpegArgsAt(cfg, s.probe, s.tmpDir, targetIdx, startSec) ffCtx, cancel := context.WithCancel(context.Background()) cmd := exec.CommandContext(ffCtx, s.cfg.Transcode.FFmpegPath, args...) @@ -1352,7 +1397,7 @@ func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir strin // extractSubtitles spawns short-lived ffmpeg jobs to convert each text-based // subtitle track to WebVTT in parallel. Bitmap subs (PGS, DVB) are skipped — // they would require burn-in into the video encode, which is out of scope. -func (s *HLSSession) extractSubtitles(ctx context.Context) { +func (s *HLSSession) extractSubtitles(ctx context.Context, src string) { subsDir := filepath.Join(s.tmpDir, "subs") for i, sub := range s.probe.SubtitleTracks { if !sub.IsTextSubtitle() { @@ -1361,7 +1406,7 @@ func (s *HLSSession) extractSubtitles(ctx context.Context) { out := filepath.Join(subsDir, fmt.Sprintf("sub-%d.vtt", i)) args := []string{ "-y", "-hide_banner", "-loglevel", "warning", - "-i", s.cfg.sourceRef(), + "-i", src, "-map", fmt.Sprintf("0:s:%d?", i), "-c:s", "webvtt", out, diff --git a/internal/engine/stream_source_debrid.go b/internal/engine/stream_source_debrid.go index 03b2c3a..ec74529 100644 --- a/internal/engine/stream_source_debrid.go +++ b/internal/engine/stream_source_debrid.go @@ -21,6 +21,7 @@ import ( "net/http" "path" "strings" + "sync" "time" ) @@ -47,7 +48,10 @@ var debridHTTPClient = &http.Client{ // Returns an error only when neither a HEAD size nor a fallback is available — // http.ServeContent needs a real size to range-serve, and serving size 0 would // hand the browser an empty file. -func NewDebridFileProvider(ctx context.Context, directURL, fileName string, fallbackSize int64) (FileProvider, error) { +// refresh, when non-nil, re-resolves a fresh debrid URL for the same content +// (hueco #2 / 2c) — called when the current link expires mid-stream. nil keeps +// 2a behaviour (an expired link is a hard error, no recovery). +func NewDebridFileProvider(ctx context.Context, directURL, fileName string, fallbackSize int64, refresh func(context.Context) (string, error)) (FileProvider, error) { if directURL == "" { return nil, errors.New("debrid provider: empty direct URL") } @@ -69,23 +73,95 @@ func NewDebridFileProvider(ctx context.Context, directURL, fileName string, fall name = debridNameFromURL(directURL) } return &debridFileProvider{ - url: directURL, - name: name, - size: size, + url: directURL, + name: name, + size: size, + refresh: refresh, }, nil } -// debridFileProvider serves a file from a debrid HTTPS URL via ranged GETs. +// debridFileProvider serves a file from a debrid HTTPS URL via ranged GETs. The +// URL is mutable: when it expires mid-stream, refreshURL swaps in a fresh one +// (shared across all readers this provider hands out) so the next range request +// uses the live link. type debridFileProvider struct { - url string + mu sync.Mutex + url string + lastRefreshAt time.Time + inflight *refreshCall // non-nil while a refresh is running; coalesces concurrent callers + refresh func(context.Context) (string, error) + name string size int64 } +// refreshCall is a single in-flight refresh whose result is shared by every +// reader that piles up behind it (singleflight). done is closed on completion. +type refreshCall struct { + done chan struct{} + url string + err error +} + +// currentURL returns the live debrid URL (mutated by refreshURL on expiry). +func (p *debridFileProvider) currentURL() string { + p.mu.Lock() + defer p.mu.Unlock() + return p.url +} + +// refreshURL re-resolves a fresh debrid link and stores it. A browser's