feat(stream): progressive fMP4 remux source for /stream (hueco #3 / 3b-i)

Agent side of 3b: serve a growing ffmpeg `-c copy` remux (mkv h264/aac →
fragmented MP4) over /stream with no video re-encode. Dormant until the web
sends PlayMethod="remux" (3b-ii), so this commit changes no live behavior.

- GrowingSource interface + transcodeSource already satisfies it; estimate is
  the source file size for copy actions (≈ remux output) vs bitrate×duration
  for real transcodes.
- NewRemuxSource: ffmpeg -c copy → growing fMP4 temp, returned as GrowingSource.
- StreamServer.SetGrowingFile + serveGrowing: manual Range responder for a
  growing source (http.ServeContent needs a fixed size). 206 with an estimated
  total in Content-Range; chunked body while not final (never promise bytes a
  running remux might not produce); exact Content-Length once final. Blocks via
  ReadAt for not-yet-produced bytes; forward seek waits, backward seek instant.
- daemon OnStreamSession: PlayMethod=="remux" → NewRemuxSource + SetGrowingFile
  + MarkSessionReady (after the ffmpeg check; copy still needs ffmpeg).
- Tests: parseByteRange + serveGrowing (full/offset/bounded/estimate/HEAD/416).
This commit is contained in:
Deivid Soto 2026-05-31 11:49:31 +02:00
parent 6e8bca2ac4
commit 4a12f13b96
4 changed files with 450 additions and 6 deletions

View file

@ -38,13 +38,31 @@ type FileProvider interface {
FileSize() int64
}
// GrowingSource is a /stream source whose bytes are produced over time by an
// ffmpeg remux/transcode to a temp file (see transcodeSource). It is served
// via manual Range handling (serveGrowing) instead of http.ServeContent,
// which assumes a complete, fixed-size, seekable file. Used by direct-play's
// remux path (hueco #3 / 3b): mkv h264/aac → progressive fMP4, no re-encode.
type GrowingSource interface {
// ReadAt blocks until off+len(p) bytes have been produced, the source is
// final, or a timeout elapses; near the live edge it returns a short
// (n>0, nil) read so the caller can stream what exists so far.
ReadAt(p []byte, off int64) (int, error)
Size() int64 // bytes produced so far
Final() bool // ffmpeg exited — Size() is now the true total
EstimatedSize() int64 // expected final size, for the scrubber timeline
FileName() string
Close() error
}
// StreamServer is a persistent HTTP server that serves one file at a time.
// Start it once with Listen(), then swap files with SetFile()/ClearFile().
// The server stays alive for the entire daemon lifecycle — no port churn.
type StreamServer struct {
mu sync.RWMutex
provider FileProvider
taskID string // current task being streamed
growing GrowingSource // set instead of provider for the progressive-remux path (3b)
taskID string // current task being streamed
server *http.Server
port int
@ -270,9 +288,14 @@ func (ss *StreamServer) Listen(ctx context.Context) error {
// SetFile atomically swaps the file being served and resets progress tracking.
func (ss *StreamServer) SetFile(provider FileProvider, taskID string) {
ss.mu.Lock()
prevGrowing := ss.growing
ss.provider = provider
ss.growing = nil // a raw-file provider supersedes any in-flight remux
ss.taskID = taskID
ss.mu.Unlock()
if prevGrowing != nil {
_ = prevGrowing.Close() // stop the orphan ffmpeg + drop its temp file
}
ss.totalFileSize.Store(provider.FileSize())
ss.lastActivity.Store(time.Now().UnixNano())
ss.maxByteOffset.Store(0)
@ -295,12 +318,40 @@ func (ss *StreamServer) SetFile(provider FileProvider, taskID string) {
}
}
// SetGrowingFile serves a progressive-remux source on /stream (hueco #3 / 3b):
// ffmpeg `-c copy` mkv→fMP4 to a growing temp file, range-served via
// serveGrowing. Supersedes any prior provider/growing source (single-viewer).
func (ss *StreamServer) SetGrowingFile(src GrowingSource, taskID string) {
ss.mu.Lock()
prevGrowing := ss.growing
ss.growing = src
ss.provider = nil
ss.taskID = taskID
ss.mu.Unlock()
if prevGrowing != nil {
_ = prevGrowing.Close()
}
ss.totalFileSize.Store(src.EstimatedSize())
ss.lastActivity.Store(time.Now().UnixNano())
ss.maxByteOffset.Store(0)
ss.topReaderID.Store(0)
// Rate-limit + bitrate tracking are for raw-file playback; the remux pump
// has its own pacing (ffmpeg copy is I/O-bound), so leave them at zero.
ss.bitrateBps.Store(0)
ss.durationSec.Store(0)
}
// ClearFile stops serving any file. Subsequent requests return 404.
func (ss *StreamServer) ClearFile() {
ss.mu.Lock()
ss.provider = nil
prevGrowing := ss.growing
ss.growing = nil
ss.taskID = ""
ss.mu.Unlock()
if prevGrowing != nil {
_ = prevGrowing.Close()
}
ss.totalFileSize.Store(0)
ss.maxByteOffset.Store(0)
ss.topReaderID.Store(0)
@ -315,11 +366,11 @@ func (ss *StreamServer) CurrentTaskID() string {
return ss.taskID
}
// HasFile returns true if a file is currently being served.
// HasFile returns true if a file (raw provider or growing remux) is being served.
func (ss *StreamServer) HasFile() bool {
ss.mu.RLock()
defer ss.mu.RUnlock()
return ss.provider != nil
return ss.provider != nil || ss.growing != nil
}
// URL returns the best single stream URL (backward compat).
@ -675,12 +726,13 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) {
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
log.Printf("[stream] %s /stream from %s Range:%q", r.Method, clientIP, r.Header.Get("Range"))
// Get current provider (may be nil if no file is being served)
// Get current source (raw provider or growing remux; nil if none).
ss.mu.RLock()
provider := ss.provider
growing := ss.growing
ss.mu.RUnlock()
if provider == nil {
if provider == nil && growing == nil {
http.Error(w, "no active stream", http.StatusNotFound)
return
}
@ -697,6 +749,13 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) {
return
}
// Progressive-remux path (3b): a growing fMP4 produced by ffmpeg `-c copy`.
// Range-served manually because http.ServeContent needs a complete file.
if growing != nil {
ss.serveGrowing(w, r, growing)
return
}
rawReader := provider.NewFileReader(r.Context())
if rawReader == nil {
http.Error(w, "file not found", http.StatusNotFound)
@ -743,6 +802,134 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, provider.FileName(), time.Time{}, reader)
}
// serveGrowing range-serves a growing remux source (hueco #3 / 3b). Unlike
// http.ServeContent it can't rely on a fixed file size: ffmpeg `-c copy` is
// still writing, and the final byte count isn't known until it exits. So we:
//
// - advertise an ESTIMATED total (≈ source file size for a copy remux) in
// Content-Range so the browser scrubber has a timeline;
// - reply 206 and stream from the requested offset, blocking via ReadAt for
// not-yet-produced bytes, until the explicit range end or the real EOF;
// - send the body chunked (no Content-Length) for non-final sources, since
// the true length differs from the estimate — promising an exact length we
// can't fulfil would hang the browser. When the source is already final we
// send an exact Content-Length.
//
// Seeking forward into a not-yet-remuxed region blocks briefly until the copy
// (I/O-bound, fast) catches up; seeking back to produced bytes is immediate.
func (ss *StreamServer) serveGrowing(w http.ResponseWriter, r *http.Request, src GrowingSource) {
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Type", "video/mp4")
w.Header().Set("Content-Disposition", fmt.Sprintf("inline; filename=%q", src.FileName()))
// Total to advertise: exact when ffmpeg has exited, else the estimate.
total := src.EstimatedSize()
if src.Final() {
total = src.Size()
}
if total <= 0 {
total = src.Size()
}
start, explicitEnd := parseByteRange(r.Header.Get("Range"))
if total > 0 && start >= total {
// Range beyond what we expect to produce — let the browser recover.
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", total))
http.Error(w, "range not satisfiable", http.StatusRequestedRangeNotSatisfiable)
return
}
if r.Method == http.MethodHead {
if total > 0 {
w.Header().Set("Content-Length", strconv.FormatInt(total, 10))
}
w.WriteHeader(http.StatusOK)
return
}
end := total - 1
if explicitEnd >= 0 && explicitEnd < end {
end = explicitEnd
}
if total > 0 {
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, total))
}
// Exact Content-Length only when the source is final (true size known) so
// we never promise bytes a still-running remux might not produce.
if src.Final() && explicitEnd < 0 {
w.Header().Set("Content-Length", strconv.FormatInt(src.Size()-start, 10))
}
w.WriteHeader(http.StatusPartialContent)
buf := make([]byte, 256*1024)
off := start
for {
if explicitEnd >= 0 && off > explicitEnd {
return
}
if r.Context().Err() != nil {
return // client disconnected / request cancelled
}
n, err := src.ReadAt(buf, off)
if n > 0 {
toWrite := n
if explicitEnd >= 0 {
if remaining := explicitEnd - off + 1; int64(toWrite) > remaining {
toWrite = int(remaining)
}
}
if _, werr := w.Write(buf[:toWrite]); werr != nil {
return // client gone
}
off += int64(toWrite)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
if err != nil {
// transcodeSource returns io.EOF only at the true (final) end; any
// other error means ffmpeg failed or the read timed out. Either
// way the stream is over — close the body.
return
}
}
}
// parseByteRange parses a single "bytes=start-[end]" header into (start, end).
// end is -1 when open-ended or absent. Multi-range and suffix ranges
// ("bytes=-N") are not supported (returns start=0) — the browser falls back to
// a normal open-ended request, which is all <video> needs for a growing source.
func parseByteRange(header string) (start, end int64) {
end = -1
if !strings.HasPrefix(header, "bytes=") {
return 0, -1
}
spec := strings.TrimPrefix(header, "bytes=")
if i := strings.IndexByte(spec, ','); i >= 0 {
spec = spec[:i] // first range only
}
dash := strings.IndexByte(spec, '-')
if dash < 0 {
return 0, -1
}
startStr := strings.TrimSpace(spec[:dash])
if startStr == "" {
// Suffix range "bytes=-N" (last N bytes) is unsupported on a growing
// source whose total isn't fixed — serve open-ended from 0 instead of
// mis-reading N as an absolute end. fMP4 (moov at front) never needs it.
return 0, -1
}
if v, err := strconv.ParseInt(startStr, 10, 64); err == nil && v >= 0 {
start = v
}
if e := strings.TrimSpace(spec[dash+1:]); e != "" {
if v, err := strconv.ParseInt(e, 10, 64); err == nil && v >= 0 {
end = v
}
}
return start, end
}
// EstimatedProgress returns estimated watch progress percentage (0-100)
// and the total duration in seconds (0 if unknown).
func (ss *StreamServer) EstimatedProgress() (pct int, durationSec int) {