diff --git a/Docs/plans/unarr-agent-roadmap.md b/Docs/plans/unarr-agent-roadmap.md index 217ccc2..8bcceaa 100644 --- a/Docs/plans/unarr-agent-roadmap.md +++ b/Docs/plans/unarr-agent-roadmap.md @@ -53,8 +53,8 @@ Diseño por fases (2a direct-play / 2b HLS-desde-URL / 2c fallback) en el estado El path HLS **siempre re-encoda** (incluso mp4 h264/aac ya compatible). `DecideAction` (passthrough/remux) existe pero muerto en el path browser. Sin negociación por capacidades del dispositivo. Sin ABR multi-bitrate. -Diseño por fases (3a direct-play / 3b remux fMP4 / 3c capability-negotiation / 3d ABR) -en el estado abajo. **Fases 3a + 3b CERRADAS** (smoke e2e en browser); 3c/3d pendientes. +Diseño por fases (3a direct-play / 3b remux-HLS / 3c capability-negotiation / 3d ABR) +en el estado abajo. **Fase 3a CERRADA** (CLI c8d7c4b + web 636fbe59); 3b/3c/3d pendientes. ### Hueco #4 — Pre-transcode (transcode-on-download) 🔵 DISEÑADO (ver estado abajo) Al completar una descarga/import, transcodificar/remuxar en background para que el @@ -294,29 +294,6 @@ el orden de STREAMING (no el de descarga) prefiera debrid. `engine/transcoder.go` (args `-c copy` fMP4). WEB `lib/stream/play-method.ts` (+"remux"), `stream/session/route.ts`, `HlsStreamPlayer.tsx` (`!= "hls"`). - **CERRADO 2026-05-31:** - - CLI 3b-i (`feat/unarr-agent` 4a12f13): `GrowingSource` + `NewRemuxSource` - (reusa `transcodeSource`+`ActionRemux`, estimate = tamaño origen para copy); - `StreamServer.SetGrowingFile` + `serveGrowing` (responder Range manual: 206 - con total estimado en `Content-Range`, body chunked mientras no-final, exact - `Content-Length` al finalizar, bloqueo vía `ReadAt`); branch `remux` en - `OnStreamSession`. Tests `parseByteRange`+`serveGrowing` (full/offset/bounded/ - estimate/HEAD/416). build+vet+test verdes. - - WEB 3b-ii (`feat/unarr-brand` 10b7d602): `decidePlayMethod`→`"remux"` para - codecs compatibles en contenedor no-nativo; ruta gatea remux como direct - (versión, metadata, sin downscale, audioIndex -1); player trata `!= "hls"` - como attach nativo. lint+typecheck+2334 unit OK. - - **Smoke e2e (browser, mkv h264/aac 1080p):** `playMethod: remux`, `hlsUrls: - null`; agente `[stream …] remux (copy) → fMP4`; `/stream` HEAD 200 + GET Range - 206 con fMP4 válido (`ftyp iso6 mp41`+`moov`); browser reproduce 1080p nativo, - duration leída del fMP4, **seek a 2min OK**, **0 reqs `/hls`**. ✓ - - **Bug cazado por el smoke:** la respuesta `created` de la ruta quedó en - `playMethod === "direct" ? null` (en vez de `!== "hls"`) → devolvía `hlsUrls` - para remux. Corregido (el player usaba streamUrls igual, pero inconsistente). - - **Limitación:** seek-adelante a zona aún-no-remuxada bloquea hasta que el copy - (rápido) la alcanza; seek-atrás inmediato. Audio no-default / subs-bitmap → - siguen yendo por HLS (gate `audioIndex == -1`). - - **Fase 3c — capability negotiation (device-profile).** El web envía `{maxHeight, codecs:[h264,hevc,av1], containers}` (de UA + `canPlayType`). `decidePlayMethod` se hace device-aware: p.ej. Safari/AppleTV que reproduce HEVC diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index ef999a3..11ccf0c 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -618,41 +618,6 @@ func runDaemonStart() error { log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID)) return } - - // Remux path (hueco #3 / 3b): codecs are browser-native (h264/aac) but - // the container isn't (mkv). ffmpeg `-c copy` → growing fMP4 served raw - // over /stream — no video re-encode, no HLS. The web decided this from - // scan metadata + version gate; we still need ffmpeg (copy uses it). - if sess.PlayMethod == "remux" { - probeCtx, cancelProbe := context.WithTimeout(ctx, 15*time.Second) - probe, perr := engine.ProbeFile(probeCtx, tcRuntime.FFprobePath, filePath) - cancelProbe() - if perr != nil { - log.Printf("[stream %s] remux probe failed: %v", agent.ShortID(sess.SessionID), perr) - return - } - remuxCtx, remuxCancel := context.WithCancel(ctx) - src, serr := engine.NewRemuxSource(remuxCtx, filePath, probe, tcRuntime.FFmpegPath, sess.FileName) - if serr != nil { - remuxCancel() - log.Printf("[stream %s] remux start failed: %v", agent.ShortID(sess.SessionID), serr) - return - } - streamSrv.SetGrowingFile(src, sess.TaskID) - // cancel stops the ffmpeg copy; SetGrowingFile/ClearFile also Close() - // the source, so the temp file is always cleaned up. - playerSessionRegistry.add(sess.SessionID, func() { remuxCancel(); streamSrv.ClearFile() }) - log.Printf("[stream %s] remux (copy) → fMP4: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath)) - go func() { - rctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil { - log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err) - } - }() - return - } - hlsCtx, hlsCancel := context.WithCancel(ctx) playerSessionRegistry.add(sess.SessionID, hlsCancel) hlsCfg := engine.HLSSessionConfig{ diff --git a/internal/engine/stream_growing_test.go b/internal/engine/stream_growing_test.go deleted file mode 100644 index cb16c1f..0000000 --- a/internal/engine/stream_growing_test.go +++ /dev/null @@ -1,197 +0,0 @@ -package engine - -import ( - "io" - "net/http" - "net/http/httptest" - "testing" -) - -// fakeGrowing is a GrowingSource backed by a fixed byte slice. When final is -// true it behaves like a completed remux (ReadAt returns io.EOF at the end); -// est overrides the advertised estimate (0 = use len(data)). -type fakeGrowing struct { - data []byte - final bool - est int64 -} - -func (f *fakeGrowing) ReadAt(p []byte, off int64) (int, error) { - if off < 0 || off >= int64(len(f.data)) { - return 0, io.EOF - } - n := copy(p, f.data[off:]) - if int(off)+n >= len(f.data) { - return n, io.EOF - } - return n, nil -} -func (f *fakeGrowing) Size() int64 { return int64(len(f.data)) } -func (f *fakeGrowing) Final() bool { return f.final } -func (f *fakeGrowing) EstimatedSize() int64 { - if f.est > 0 { - return f.est - } - return int64(len(f.data)) -} -func (f *fakeGrowing) FileName() string { return "movie.mp4" } -func (f *fakeGrowing) Close() error { return nil } - -func TestParseByteRange(t *testing.T) { - cases := []struct { - in string - start, end int64 - }{ - {"", 0, -1}, - {"bytes=0-", 0, -1}, - {"bytes=100-", 100, -1}, - {"bytes=5-9", 5, 9}, - {"bytes=0-0", 0, 0}, - {"bytes=10-19,40-49", 10, 19}, // first range only - {"bytes=-500", 0, -1}, // suffix unsupported → open from 0 - {"garbage", 0, -1}, - {"bytes=", 0, -1}, - } - for _, c := range cases { - s, e := parseByteRange(c.in) - if s != c.start || e != c.end { - t.Errorf("parseByteRange(%q) = (%d,%d), want (%d,%d)", c.in, s, e, c.start, c.end) - } - } -} - -func TestServeGrowing_FinalFullRequest(t *testing.T) { - data := []byte("0123456789abcdef") - src := &fakeGrowing{data: data, final: true} - ss := &StreamServer{} - - req := httptest.NewRequest(http.MethodGet, "/stream", nil) - rec := httptest.NewRecorder() - ss.serveGrowing(rec, req, src) - - res := rec.Result() - if res.StatusCode != http.StatusPartialContent { - t.Fatalf("status = %d, want 206", res.StatusCode) - } - if got := res.Header.Get("Content-Range"); got != "bytes 0-15/16" { - t.Errorf("Content-Range = %q, want bytes 0-15/16", got) - } - if got := res.Header.Get("Accept-Ranges"); got != "bytes" { - t.Errorf("Accept-Ranges = %q, want bytes", got) - } - if got := res.Header.Get("Content-Type"); got != "video/mp4" { - t.Errorf("Content-Type = %q, want video/mp4", got) - } - // Final + open-ended → exact Content-Length. - if got := res.Header.Get("Content-Length"); got != "16" { - t.Errorf("Content-Length = %q, want 16", got) - } - if body := rec.Body.String(); body != string(data) { - t.Errorf("body = %q, want %q", body, string(data)) - } -} - -func TestServeGrowing_OffsetRange(t *testing.T) { - data := []byte("0123456789abcdef") - src := &fakeGrowing{data: data, final: true} - ss := &StreamServer{} - - req := httptest.NewRequest(http.MethodGet, "/stream", nil) - req.Header.Set("Range", "bytes=10-") - rec := httptest.NewRecorder() - ss.serveGrowing(rec, req, src) - - res := rec.Result() - if res.StatusCode != http.StatusPartialContent { - t.Fatalf("status = %d, want 206", res.StatusCode) - } - if got := res.Header.Get("Content-Range"); got != "bytes 10-15/16" { - t.Errorf("Content-Range = %q, want bytes 10-15/16", got) - } - if body := rec.Body.String(); body != "abcdef" { - t.Errorf("body = %q, want abcdef", body) - } -} - -func TestServeGrowing_BoundedRange(t *testing.T) { - data := []byte("0123456789abcdef") - src := &fakeGrowing{data: data, final: true} - ss := &StreamServer{} - - req := httptest.NewRequest(http.MethodGet, "/stream", nil) - req.Header.Set("Range", "bytes=5-9") - rec := httptest.NewRecorder() - ss.serveGrowing(rec, req, src) - - res := rec.Result() - if res.StatusCode != http.StatusPartialContent { - t.Fatalf("status = %d, want 206", res.StatusCode) - } - if got := res.Header.Get("Content-Range"); got != "bytes 5-9/16" { - t.Errorf("Content-Range = %q, want bytes 5-9/16", got) - } - if body := rec.Body.String(); body != "56789" { - t.Errorf("body = %q, want 56789 (exactly the requested 5 bytes)", body) - } -} - -func TestServeGrowing_EstimateUsedWhileNotFinal(t *testing.T) { - // Not final: only 8 bytes produced, but estimate says 100. The advertised - // total is the estimate (scrubber timeline); body is what exists so far. - src := &fakeGrowing{data: []byte("01234567"), final: false, est: 100} - ss := &StreamServer{} - - req := httptest.NewRequest(http.MethodGet, "/stream", nil) - rec := httptest.NewRecorder() - ss.serveGrowing(rec, req, src) - - res := rec.Result() - if res.StatusCode != http.StatusPartialContent { - t.Fatalf("status = %d, want 206", res.StatusCode) - } - if got := res.Header.Get("Content-Range"); got != "bytes 0-99/100" { - t.Errorf("Content-Range = %q, want bytes 0-99/100 (estimate)", got) - } - // Not final → no exact Content-Length (chunked) so we never promise bytes - // a still-running remux might not produce. - if got := res.Header.Get("Content-Length"); got != "" { - t.Errorf("Content-Length = %q, want empty (chunked) while not final", got) - } - if body := rec.Body.String(); body != "01234567" { - t.Errorf("body = %q, want 01234567 (bytes produced so far)", body) - } -} - -func TestServeGrowing_HeadProbe(t *testing.T) { - src := &fakeGrowing{data: make([]byte, 0), final: false, est: 4242} - ss := &StreamServer{} - - req := httptest.NewRequest(http.MethodHead, "/stream", nil) - rec := httptest.NewRecorder() - ss.serveGrowing(rec, req, src) - - res := rec.Result() - if res.StatusCode != http.StatusOK { - t.Fatalf("HEAD status = %d, want 200", res.StatusCode) - } - if got := res.Header.Get("Content-Length"); got != "4242" { - t.Errorf("HEAD Content-Length = %q, want 4242", got) - } - if rec.Body.Len() != 0 { - t.Errorf("HEAD body = %d bytes, want 0", rec.Body.Len()) - } -} - -func TestServeGrowing_RangeBeyondTotal(t *testing.T) { - src := &fakeGrowing{data: []byte("0123456789"), final: true} - ss := &StreamServer{} - - req := httptest.NewRequest(http.MethodGet, "/stream", nil) - req.Header.Set("Range", "bytes=999-") - rec := httptest.NewRecorder() - ss.serveGrowing(rec, req, src) - - if rec.Result().StatusCode != http.StatusRequestedRangeNotSatisfiable { - t.Errorf("status = %d, want 416", rec.Result().StatusCode) - } -} diff --git a/internal/engine/stream_server.go b/internal/engine/stream_server.go index fb3e76c..a5440e6 100644 --- a/internal/engine/stream_server.go +++ b/internal/engine/stream_server.go @@ -38,31 +38,13 @@ type FileProvider interface { FileSize() int64 } -// GrowingSource is a /stream source whose bytes are produced over time by an -// ffmpeg remux/transcode to a temp file (see transcodeSource). It is served -// via manual Range handling (serveGrowing) instead of http.ServeContent, -// which assumes a complete, fixed-size, seekable file. Used by direct-play's -// remux path (hueco #3 / 3b): mkv h264/aac → progressive fMP4, no re-encode. -type GrowingSource interface { - // ReadAt blocks until off+len(p) bytes have been produced, the source is - // final, or a timeout elapses; near the live edge it returns a short - // (n>0, nil) read so the caller can stream what exists so far. - ReadAt(p []byte, off int64) (int, error) - Size() int64 // bytes produced so far - Final() bool // ffmpeg exited — Size() is now the true total - EstimatedSize() int64 // expected final size, for the scrubber timeline - FileName() string - Close() error -} - // StreamServer is a persistent HTTP server that serves one file at a time. // Start it once with Listen(), then swap files with SetFile()/ClearFile(). // The server stays alive for the entire daemon lifecycle — no port churn. type StreamServer struct { mu sync.RWMutex provider FileProvider - growing GrowingSource // set instead of provider for the progressive-remux path (3b) - taskID string // current task being streamed + taskID string // current task being streamed server *http.Server port int @@ -288,14 +270,9 @@ func (ss *StreamServer) Listen(ctx context.Context) error { // SetFile atomically swaps the file being served and resets progress tracking. func (ss *StreamServer) SetFile(provider FileProvider, taskID string) { ss.mu.Lock() - prevGrowing := ss.growing ss.provider = provider - ss.growing = nil // a raw-file provider supersedes any in-flight remux ss.taskID = taskID ss.mu.Unlock() - if prevGrowing != nil { - _ = prevGrowing.Close() // stop the orphan ffmpeg + drop its temp file - } ss.totalFileSize.Store(provider.FileSize()) ss.lastActivity.Store(time.Now().UnixNano()) ss.maxByteOffset.Store(0) @@ -318,40 +295,12 @@ func (ss *StreamServer) SetFile(provider FileProvider, taskID string) { } } -// SetGrowingFile serves a progressive-remux source on /stream (hueco #3 / 3b): -// ffmpeg `-c copy` mkv→fMP4 to a growing temp file, range-served via -// serveGrowing. Supersedes any prior provider/growing source (single-viewer). -func (ss *StreamServer) SetGrowingFile(src GrowingSource, taskID string) { - ss.mu.Lock() - prevGrowing := ss.growing - ss.growing = src - ss.provider = nil - ss.taskID = taskID - ss.mu.Unlock() - if prevGrowing != nil { - _ = prevGrowing.Close() - } - ss.totalFileSize.Store(src.EstimatedSize()) - ss.lastActivity.Store(time.Now().UnixNano()) - ss.maxByteOffset.Store(0) - ss.topReaderID.Store(0) - // Rate-limit + bitrate tracking are for raw-file playback; the remux pump - // has its own pacing (ffmpeg copy is I/O-bound), so leave them at zero. - ss.bitrateBps.Store(0) - ss.durationSec.Store(0) -} - // ClearFile stops serving any file. Subsequent requests return 404. func (ss *StreamServer) ClearFile() { ss.mu.Lock() ss.provider = nil - prevGrowing := ss.growing - ss.growing = nil ss.taskID = "" ss.mu.Unlock() - if prevGrowing != nil { - _ = prevGrowing.Close() - } ss.totalFileSize.Store(0) ss.maxByteOffset.Store(0) ss.topReaderID.Store(0) @@ -366,11 +315,11 @@ func (ss *StreamServer) CurrentTaskID() string { return ss.taskID } -// HasFile returns true if a file (raw provider or growing remux) is being served. +// HasFile returns true if a file is currently being served. func (ss *StreamServer) HasFile() bool { ss.mu.RLock() defer ss.mu.RUnlock() - return ss.provider != nil || ss.growing != nil + return ss.provider != nil } // URL returns the best single stream URL (backward compat). @@ -726,13 +675,12 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) log.Printf("[stream] %s /stream from %s Range:%q", r.Method, clientIP, r.Header.Get("Range")) - // Get current source (raw provider or growing remux; nil if none). + // Get current provider (may be nil if no file is being served) ss.mu.RLock() provider := ss.provider - growing := ss.growing ss.mu.RUnlock() - if provider == nil && growing == nil { + if provider == nil { http.Error(w, "no active stream", http.StatusNotFound) return } @@ -749,13 +697,6 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { return } - // Progressive-remux path (3b): a growing fMP4 produced by ffmpeg `-c copy`. - // Range-served manually because http.ServeContent needs a complete file. - if growing != nil { - ss.serveGrowing(w, r, growing) - return - } - rawReader := provider.NewFileReader(r.Context()) if rawReader == nil { http.Error(w, "file not found", http.StatusNotFound) @@ -802,134 +743,6 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { http.ServeContent(w, r, provider.FileName(), time.Time{}, reader) } -// serveGrowing range-serves a growing remux source (hueco #3 / 3b). Unlike -// http.ServeContent it can't rely on a fixed file size: ffmpeg `-c copy` is -// still writing, and the final byte count isn't known until it exits. So we: -// -// - advertise an ESTIMATED total (≈ source file size for a copy remux) in -// Content-Range so the browser scrubber has a timeline; -// - reply 206 and stream from the requested offset, blocking via ReadAt for -// not-yet-produced bytes, until the explicit range end or the real EOF; -// - send the body chunked (no Content-Length) for non-final sources, since -// the true length differs from the estimate — promising an exact length we -// can't fulfil would hang the browser. When the source is already final we -// send an exact Content-Length. -// -// Seeking forward into a not-yet-remuxed region blocks briefly until the copy -// (I/O-bound, fast) catches up; seeking back to produced bytes is immediate. -func (ss *StreamServer) serveGrowing(w http.ResponseWriter, r *http.Request, src GrowingSource) { - w.Header().Set("Accept-Ranges", "bytes") - w.Header().Set("Content-Type", "video/mp4") - w.Header().Set("Content-Disposition", fmt.Sprintf("inline; filename=%q", src.FileName())) - - // Total to advertise: exact when ffmpeg has exited, else the estimate. - total := src.EstimatedSize() - if src.Final() { - total = src.Size() - } - if total <= 0 { - total = src.Size() - } - - start, explicitEnd := parseByteRange(r.Header.Get("Range")) - if total > 0 && start >= total { - // Range beyond what we expect to produce — let the browser recover. - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", total)) - http.Error(w, "range not satisfiable", http.StatusRequestedRangeNotSatisfiable) - return - } - - if r.Method == http.MethodHead { - if total > 0 { - w.Header().Set("Content-Length", strconv.FormatInt(total, 10)) - } - w.WriteHeader(http.StatusOK) - return - } - - end := total - 1 - if explicitEnd >= 0 && explicitEnd < end { - end = explicitEnd - } - if total > 0 { - w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, total)) - } - // Exact Content-Length only when the source is final (true size known) so - // we never promise bytes a still-running remux might not produce. - if src.Final() && explicitEnd < 0 { - w.Header().Set("Content-Length", strconv.FormatInt(src.Size()-start, 10)) - } - w.WriteHeader(http.StatusPartialContent) - - buf := make([]byte, 256*1024) - off := start - for { - if explicitEnd >= 0 && off > explicitEnd { - return - } - if r.Context().Err() != nil { - return // client disconnected / request cancelled - } - n, err := src.ReadAt(buf, off) - if n > 0 { - toWrite := n - if explicitEnd >= 0 { - if remaining := explicitEnd - off + 1; int64(toWrite) > remaining { - toWrite = int(remaining) - } - } - if _, werr := w.Write(buf[:toWrite]); werr != nil { - return // client gone - } - off += int64(toWrite) - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - } - if err != nil { - // transcodeSource returns io.EOF only at the true (final) end; any - // other error means ffmpeg failed or the read timed out. Either - // way the stream is over — close the body. - return - } - } -} - -// parseByteRange parses a single "bytes=start-[end]" header into (start, end). -// end is -1 when open-ended or absent. Multi-range and suffix ranges -// ("bytes=-N") are not supported (returns start=0) — the browser falls back to -// a normal open-ended request, which is all