feat(stream): progressive fMP4 remux source for /stream (hueco #3 / 3b-i)
Agent side of 3b: serve a growing ffmpeg `-c copy` remux (mkv h264/aac → fragmented MP4) over /stream with no video re-encode. Dormant until the web sends PlayMethod="remux" (3b-ii), so this commit changes no live behavior. - GrowingSource interface + transcodeSource already satisfies it; estimate is the source file size for copy actions (≈ remux output) vs bitrate×duration for real transcodes. - NewRemuxSource: ffmpeg -c copy → growing fMP4 temp, returned as GrowingSource. - StreamServer.SetGrowingFile + serveGrowing: manual Range responder for a growing source (http.ServeContent needs a fixed size). 206 with an estimated total in Content-Range; chunked body while not final (never promise bytes a running remux might not produce); exact Content-Length once final. Blocks via ReadAt for not-yet-produced bytes; forward seek waits, backward seek instant. - daemon OnStreamSession: PlayMethod=="remux" → NewRemuxSource + SetGrowingFile + MarkSessionReady (after the ffmpeg check; copy still needs ffmpeg). - Tests: parseByteRange + serveGrowing (full/offset/bounded/estimate/HEAD/416).
This commit is contained in:
parent
6e8bca2ac4
commit
4a12f13b96
4 changed files with 450 additions and 6 deletions
|
|
@ -618,6 +618,41 @@ func runDaemonStart() error {
|
|||
log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID))
|
||||
return
|
||||
}
|
||||
|
||||
// Remux path (hueco #3 / 3b): codecs are browser-native (h264/aac) but
|
||||
// the container isn't (mkv). ffmpeg `-c copy` → growing fMP4 served raw
|
||||
// over /stream — no video re-encode, no HLS. The web decided this from
|
||||
// scan metadata + version gate; we still need ffmpeg (copy uses it).
|
||||
if sess.PlayMethod == "remux" {
|
||||
probeCtx, cancelProbe := context.WithTimeout(ctx, 15*time.Second)
|
||||
probe, perr := engine.ProbeFile(probeCtx, tcRuntime.FFprobePath, filePath)
|
||||
cancelProbe()
|
||||
if perr != nil {
|
||||
log.Printf("[stream %s] remux probe failed: %v", agent.ShortID(sess.SessionID), perr)
|
||||
return
|
||||
}
|
||||
remuxCtx, remuxCancel := context.WithCancel(ctx)
|
||||
src, serr := engine.NewRemuxSource(remuxCtx, filePath, probe, tcRuntime.FFmpegPath, sess.FileName)
|
||||
if serr != nil {
|
||||
remuxCancel()
|
||||
log.Printf("[stream %s] remux start failed: %v", agent.ShortID(sess.SessionID), serr)
|
||||
return
|
||||
}
|
||||
streamSrv.SetGrowingFile(src, sess.TaskID)
|
||||
// cancel stops the ffmpeg copy; SetGrowingFile/ClearFile also Close()
|
||||
// the source, so the temp file is always cleaned up.
|
||||
playerSessionRegistry.add(sess.SessionID, func() { remuxCancel(); streamSrv.ClearFile() })
|
||||
log.Printf("[stream %s] remux (copy) → fMP4: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath))
|
||||
go func() {
|
||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
|
||||
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
hlsCtx, hlsCancel := context.WithCancel(ctx)
|
||||
playerSessionRegistry.add(sess.SessionID, hlsCancel)
|
||||
hlsCfg := engine.HLSSessionConfig{
|
||||
|
|
|
|||
197
internal/engine/stream_growing_test.go
Normal file
197
internal/engine/stream_growing_test.go
Normal file
|
|
@ -0,0 +1,197 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// fakeGrowing is a GrowingSource backed by a fixed byte slice. When final is
|
||||
// true it behaves like a completed remux (ReadAt returns io.EOF at the end);
|
||||
// est overrides the advertised estimate (0 = use len(data)).
|
||||
type fakeGrowing struct {
|
||||
data []byte
|
||||
final bool
|
||||
est int64
|
||||
}
|
||||
|
||||
func (f *fakeGrowing) ReadAt(p []byte, off int64) (int, error) {
|
||||
if off < 0 || off >= int64(len(f.data)) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n := copy(p, f.data[off:])
|
||||
if int(off)+n >= len(f.data) {
|
||||
return n, io.EOF
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
func (f *fakeGrowing) Size() int64 { return int64(len(f.data)) }
|
||||
func (f *fakeGrowing) Final() bool { return f.final }
|
||||
func (f *fakeGrowing) EstimatedSize() int64 {
|
||||
if f.est > 0 {
|
||||
return f.est
|
||||
}
|
||||
return int64(len(f.data))
|
||||
}
|
||||
func (f *fakeGrowing) FileName() string { return "movie.mp4" }
|
||||
func (f *fakeGrowing) Close() error { return nil }
|
||||
|
||||
func TestParseByteRange(t *testing.T) {
|
||||
cases := []struct {
|
||||
in string
|
||||
start, end int64
|
||||
}{
|
||||
{"", 0, -1},
|
||||
{"bytes=0-", 0, -1},
|
||||
{"bytes=100-", 100, -1},
|
||||
{"bytes=5-9", 5, 9},
|
||||
{"bytes=0-0", 0, 0},
|
||||
{"bytes=10-19,40-49", 10, 19}, // first range only
|
||||
{"bytes=-500", 0, -1}, // suffix unsupported → open from 0
|
||||
{"garbage", 0, -1},
|
||||
{"bytes=", 0, -1},
|
||||
}
|
||||
for _, c := range cases {
|
||||
s, e := parseByteRange(c.in)
|
||||
if s != c.start || e != c.end {
|
||||
t.Errorf("parseByteRange(%q) = (%d,%d), want (%d,%d)", c.in, s, e, c.start, c.end)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeGrowing_FinalFullRequest(t *testing.T) {
|
||||
data := []byte("0123456789abcdef")
|
||||
src := &fakeGrowing{data: data, final: true}
|
||||
ss := &StreamServer{}
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/stream", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
ss.serveGrowing(rec, req, src)
|
||||
|
||||
res := rec.Result()
|
||||
if res.StatusCode != http.StatusPartialContent {
|
||||
t.Fatalf("status = %d, want 206", res.StatusCode)
|
||||
}
|
||||
if got := res.Header.Get("Content-Range"); got != "bytes 0-15/16" {
|
||||
t.Errorf("Content-Range = %q, want bytes 0-15/16", got)
|
||||
}
|
||||
if got := res.Header.Get("Accept-Ranges"); got != "bytes" {
|
||||
t.Errorf("Accept-Ranges = %q, want bytes", got)
|
||||
}
|
||||
if got := res.Header.Get("Content-Type"); got != "video/mp4" {
|
||||
t.Errorf("Content-Type = %q, want video/mp4", got)
|
||||
}
|
||||
// Final + open-ended → exact Content-Length.
|
||||
if got := res.Header.Get("Content-Length"); got != "16" {
|
||||
t.Errorf("Content-Length = %q, want 16", got)
|
||||
}
|
||||
if body := rec.Body.String(); body != string(data) {
|
||||
t.Errorf("body = %q, want %q", body, string(data))
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeGrowing_OffsetRange(t *testing.T) {
|
||||
data := []byte("0123456789abcdef")
|
||||
src := &fakeGrowing{data: data, final: true}
|
||||
ss := &StreamServer{}
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/stream", nil)
|
||||
req.Header.Set("Range", "bytes=10-")
|
||||
rec := httptest.NewRecorder()
|
||||
ss.serveGrowing(rec, req, src)
|
||||
|
||||
res := rec.Result()
|
||||
if res.StatusCode != http.StatusPartialContent {
|
||||
t.Fatalf("status = %d, want 206", res.StatusCode)
|
||||
}
|
||||
if got := res.Header.Get("Content-Range"); got != "bytes 10-15/16" {
|
||||
t.Errorf("Content-Range = %q, want bytes 10-15/16", got)
|
||||
}
|
||||
if body := rec.Body.String(); body != "abcdef" {
|
||||
t.Errorf("body = %q, want abcdef", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeGrowing_BoundedRange(t *testing.T) {
|
||||
data := []byte("0123456789abcdef")
|
||||
src := &fakeGrowing{data: data, final: true}
|
||||
ss := &StreamServer{}
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/stream", nil)
|
||||
req.Header.Set("Range", "bytes=5-9")
|
||||
rec := httptest.NewRecorder()
|
||||
ss.serveGrowing(rec, req, src)
|
||||
|
||||
res := rec.Result()
|
||||
if res.StatusCode != http.StatusPartialContent {
|
||||
t.Fatalf("status = %d, want 206", res.StatusCode)
|
||||
}
|
||||
if got := res.Header.Get("Content-Range"); got != "bytes 5-9/16" {
|
||||
t.Errorf("Content-Range = %q, want bytes 5-9/16", got)
|
||||
}
|
||||
if body := rec.Body.String(); body != "56789" {
|
||||
t.Errorf("body = %q, want 56789 (exactly the requested 5 bytes)", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeGrowing_EstimateUsedWhileNotFinal(t *testing.T) {
|
||||
// Not final: only 8 bytes produced, but estimate says 100. The advertised
|
||||
// total is the estimate (scrubber timeline); body is what exists so far.
|
||||
src := &fakeGrowing{data: []byte("01234567"), final: false, est: 100}
|
||||
ss := &StreamServer{}
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/stream", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
ss.serveGrowing(rec, req, src)
|
||||
|
||||
res := rec.Result()
|
||||
if res.StatusCode != http.StatusPartialContent {
|
||||
t.Fatalf("status = %d, want 206", res.StatusCode)
|
||||
}
|
||||
if got := res.Header.Get("Content-Range"); got != "bytes 0-99/100" {
|
||||
t.Errorf("Content-Range = %q, want bytes 0-99/100 (estimate)", got)
|
||||
}
|
||||
// Not final → no exact Content-Length (chunked) so we never promise bytes
|
||||
// a still-running remux might not produce.
|
||||
if got := res.Header.Get("Content-Length"); got != "" {
|
||||
t.Errorf("Content-Length = %q, want empty (chunked) while not final", got)
|
||||
}
|
||||
if body := rec.Body.String(); body != "01234567" {
|
||||
t.Errorf("body = %q, want 01234567 (bytes produced so far)", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeGrowing_HeadProbe(t *testing.T) {
|
||||
src := &fakeGrowing{data: make([]byte, 0), final: false, est: 4242}
|
||||
ss := &StreamServer{}
|
||||
|
||||
req := httptest.NewRequest(http.MethodHead, "/stream", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
ss.serveGrowing(rec, req, src)
|
||||
|
||||
res := rec.Result()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
t.Fatalf("HEAD status = %d, want 200", res.StatusCode)
|
||||
}
|
||||
if got := res.Header.Get("Content-Length"); got != "4242" {
|
||||
t.Errorf("HEAD Content-Length = %q, want 4242", got)
|
||||
}
|
||||
if rec.Body.Len() != 0 {
|
||||
t.Errorf("HEAD body = %d bytes, want 0", rec.Body.Len())
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeGrowing_RangeBeyondTotal(t *testing.T) {
|
||||
src := &fakeGrowing{data: []byte("0123456789"), final: true}
|
||||
ss := &StreamServer{}
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/stream", nil)
|
||||
req.Header.Set("Range", "bytes=999-")
|
||||
rec := httptest.NewRecorder()
|
||||
ss.serveGrowing(rec, req, src)
|
||||
|
||||
if rec.Result().StatusCode != http.StatusRequestedRangeNotSatisfiable {
|
||||
t.Errorf("status = %d, want 416", rec.Result().StatusCode)
|
||||
}
|
||||
}
|
||||
|
|
@ -38,12 +38,30 @@ type FileProvider interface {
|
|||
FileSize() int64
|
||||
}
|
||||
|
||||
// GrowingSource is a /stream source whose bytes are produced over time by an
|
||||
// ffmpeg remux/transcode to a temp file (see transcodeSource). It is served
|
||||
// via manual Range handling (serveGrowing) instead of http.ServeContent,
|
||||
// which assumes a complete, fixed-size, seekable file. Used by direct-play's
|
||||
// remux path (hueco #3 / 3b): mkv h264/aac → progressive fMP4, no re-encode.
|
||||
type GrowingSource interface {
|
||||
// ReadAt blocks until off+len(p) bytes have been produced, the source is
|
||||
// final, or a timeout elapses; near the live edge it returns a short
|
||||
// (n>0, nil) read so the caller can stream what exists so far.
|
||||
ReadAt(p []byte, off int64) (int, error)
|
||||
Size() int64 // bytes produced so far
|
||||
Final() bool // ffmpeg exited — Size() is now the true total
|
||||
EstimatedSize() int64 // expected final size, for the scrubber timeline
|
||||
FileName() string
|
||||
Close() error
|
||||
}
|
||||
|
||||
// StreamServer is a persistent HTTP server that serves one file at a time.
|
||||
// Start it once with Listen(), then swap files with SetFile()/ClearFile().
|
||||
// The server stays alive for the entire daemon lifecycle — no port churn.
|
||||
type StreamServer struct {
|
||||
mu sync.RWMutex
|
||||
provider FileProvider
|
||||
growing GrowingSource // set instead of provider for the progressive-remux path (3b)
|
||||
taskID string // current task being streamed
|
||||
|
||||
server *http.Server
|
||||
|
|
@ -270,9 +288,14 @@ func (ss *StreamServer) Listen(ctx context.Context) error {
|
|||
// SetFile atomically swaps the file being served and resets progress tracking.
|
||||
func (ss *StreamServer) SetFile(provider FileProvider, taskID string) {
|
||||
ss.mu.Lock()
|
||||
prevGrowing := ss.growing
|
||||
ss.provider = provider
|
||||
ss.growing = nil // a raw-file provider supersedes any in-flight remux
|
||||
ss.taskID = taskID
|
||||
ss.mu.Unlock()
|
||||
if prevGrowing != nil {
|
||||
_ = prevGrowing.Close() // stop the orphan ffmpeg + drop its temp file
|
||||
}
|
||||
ss.totalFileSize.Store(provider.FileSize())
|
||||
ss.lastActivity.Store(time.Now().UnixNano())
|
||||
ss.maxByteOffset.Store(0)
|
||||
|
|
@ -295,12 +318,40 @@ func (ss *StreamServer) SetFile(provider FileProvider, taskID string) {
|
|||
}
|
||||
}
|
||||
|
||||
// SetGrowingFile serves a progressive-remux source on /stream (hueco #3 / 3b):
|
||||
// ffmpeg `-c copy` mkv→fMP4 to a growing temp file, range-served via
|
||||
// serveGrowing. Supersedes any prior provider/growing source (single-viewer).
|
||||
func (ss *StreamServer) SetGrowingFile(src GrowingSource, taskID string) {
|
||||
ss.mu.Lock()
|
||||
prevGrowing := ss.growing
|
||||
ss.growing = src
|
||||
ss.provider = nil
|
||||
ss.taskID = taskID
|
||||
ss.mu.Unlock()
|
||||
if prevGrowing != nil {
|
||||
_ = prevGrowing.Close()
|
||||
}
|
||||
ss.totalFileSize.Store(src.EstimatedSize())
|
||||
ss.lastActivity.Store(time.Now().UnixNano())
|
||||
ss.maxByteOffset.Store(0)
|
||||
ss.topReaderID.Store(0)
|
||||
// Rate-limit + bitrate tracking are for raw-file playback; the remux pump
|
||||
// has its own pacing (ffmpeg copy is I/O-bound), so leave them at zero.
|
||||
ss.bitrateBps.Store(0)
|
||||
ss.durationSec.Store(0)
|
||||
}
|
||||
|
||||
// ClearFile stops serving any file. Subsequent requests return 404.
|
||||
func (ss *StreamServer) ClearFile() {
|
||||
ss.mu.Lock()
|
||||
ss.provider = nil
|
||||
prevGrowing := ss.growing
|
||||
ss.growing = nil
|
||||
ss.taskID = ""
|
||||
ss.mu.Unlock()
|
||||
if prevGrowing != nil {
|
||||
_ = prevGrowing.Close()
|
||||
}
|
||||
ss.totalFileSize.Store(0)
|
||||
ss.maxByteOffset.Store(0)
|
||||
ss.topReaderID.Store(0)
|
||||
|
|
@ -315,11 +366,11 @@ func (ss *StreamServer) CurrentTaskID() string {
|
|||
return ss.taskID
|
||||
}
|
||||
|
||||
// HasFile returns true if a file is currently being served.
|
||||
// HasFile returns true if a file (raw provider or growing remux) is being served.
|
||||
func (ss *StreamServer) HasFile() bool {
|
||||
ss.mu.RLock()
|
||||
defer ss.mu.RUnlock()
|
||||
return ss.provider != nil
|
||||
return ss.provider != nil || ss.growing != nil
|
||||
}
|
||||
|
||||
// URL returns the best single stream URL (backward compat).
|
||||
|
|
@ -675,12 +726,13 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) {
|
|||
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
|
||||
log.Printf("[stream] %s /stream from %s Range:%q", r.Method, clientIP, r.Header.Get("Range"))
|
||||
|
||||
// Get current provider (may be nil if no file is being served)
|
||||
// Get current source (raw provider or growing remux; nil if none).
|
||||
ss.mu.RLock()
|
||||
provider := ss.provider
|
||||
growing := ss.growing
|
||||
ss.mu.RUnlock()
|
||||
|
||||
if provider == nil {
|
||||
if provider == nil && growing == nil {
|
||||
http.Error(w, "no active stream", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
|
@ -697,6 +749,13 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
// Progressive-remux path (3b): a growing fMP4 produced by ffmpeg `-c copy`.
|
||||
// Range-served manually because http.ServeContent needs a complete file.
|
||||
if growing != nil {
|
||||
ss.serveGrowing(w, r, growing)
|
||||
return
|
||||
}
|
||||
|
||||
rawReader := provider.NewFileReader(r.Context())
|
||||
if rawReader == nil {
|
||||
http.Error(w, "file not found", http.StatusNotFound)
|
||||
|
|
@ -743,6 +802,134 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) {
|
|||
http.ServeContent(w, r, provider.FileName(), time.Time{}, reader)
|
||||
}
|
||||
|
||||
// serveGrowing range-serves a growing remux source (hueco #3 / 3b). Unlike
|
||||
// http.ServeContent it can't rely on a fixed file size: ffmpeg `-c copy` is
|
||||
// still writing, and the final byte count isn't known until it exits. So we:
|
||||
//
|
||||
// - advertise an ESTIMATED total (≈ source file size for a copy remux) in
|
||||
// Content-Range so the browser scrubber has a timeline;
|
||||
// - reply 206 and stream from the requested offset, blocking via ReadAt for
|
||||
// not-yet-produced bytes, until the explicit range end or the real EOF;
|
||||
// - send the body chunked (no Content-Length) for non-final sources, since
|
||||
// the true length differs from the estimate — promising an exact length we
|
||||
// can't fulfil would hang the browser. When the source is already final we
|
||||
// send an exact Content-Length.
|
||||
//
|
||||
// Seeking forward into a not-yet-remuxed region blocks briefly until the copy
|
||||
// (I/O-bound, fast) catches up; seeking back to produced bytes is immediate.
|
||||
func (ss *StreamServer) serveGrowing(w http.ResponseWriter, r *http.Request, src GrowingSource) {
|
||||
w.Header().Set("Accept-Ranges", "bytes")
|
||||
w.Header().Set("Content-Type", "video/mp4")
|
||||
w.Header().Set("Content-Disposition", fmt.Sprintf("inline; filename=%q", src.FileName()))
|
||||
|
||||
// Total to advertise: exact when ffmpeg has exited, else the estimate.
|
||||
total := src.EstimatedSize()
|
||||
if src.Final() {
|
||||
total = src.Size()
|
||||
}
|
||||
if total <= 0 {
|
||||
total = src.Size()
|
||||
}
|
||||
|
||||
start, explicitEnd := parseByteRange(r.Header.Get("Range"))
|
||||
if total > 0 && start >= total {
|
||||
// Range beyond what we expect to produce — let the browser recover.
|
||||
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", total))
|
||||
http.Error(w, "range not satisfiable", http.StatusRequestedRangeNotSatisfiable)
|
||||
return
|
||||
}
|
||||
|
||||
if r.Method == http.MethodHead {
|
||||
if total > 0 {
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(total, 10))
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
end := total - 1
|
||||
if explicitEnd >= 0 && explicitEnd < end {
|
||||
end = explicitEnd
|
||||
}
|
||||
if total > 0 {
|
||||
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, total))
|
||||
}
|
||||
// Exact Content-Length only when the source is final (true size known) so
|
||||
// we never promise bytes a still-running remux might not produce.
|
||||
if src.Final() && explicitEnd < 0 {
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(src.Size()-start, 10))
|
||||
}
|
||||
w.WriteHeader(http.StatusPartialContent)
|
||||
|
||||
buf := make([]byte, 256*1024)
|
||||
off := start
|
||||
for {
|
||||
if explicitEnd >= 0 && off > explicitEnd {
|
||||
return
|
||||
}
|
||||
if r.Context().Err() != nil {
|
||||
return // client disconnected / request cancelled
|
||||
}
|
||||
n, err := src.ReadAt(buf, off)
|
||||
if n > 0 {
|
||||
toWrite := n
|
||||
if explicitEnd >= 0 {
|
||||
if remaining := explicitEnd - off + 1; int64(toWrite) > remaining {
|
||||
toWrite = int(remaining)
|
||||
}
|
||||
}
|
||||
if _, werr := w.Write(buf[:toWrite]); werr != nil {
|
||||
return // client gone
|
||||
}
|
||||
off += int64(toWrite)
|
||||
if f, ok := w.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
// transcodeSource returns io.EOF only at the true (final) end; any
|
||||
// other error means ffmpeg failed or the read timed out. Either
|
||||
// way the stream is over — close the body.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// parseByteRange parses a single "bytes=start-[end]" header into (start, end).
|
||||
// end is -1 when open-ended or absent. Multi-range and suffix ranges
|
||||
// ("bytes=-N") are not supported (returns start=0) — the browser falls back to
|
||||
// a normal open-ended request, which is all <video> needs for a growing source.
|
||||
func parseByteRange(header string) (start, end int64) {
|
||||
end = -1
|
||||
if !strings.HasPrefix(header, "bytes=") {
|
||||
return 0, -1
|
||||
}
|
||||
spec := strings.TrimPrefix(header, "bytes=")
|
||||
if i := strings.IndexByte(spec, ','); i >= 0 {
|
||||
spec = spec[:i] // first range only
|
||||
}
|
||||
dash := strings.IndexByte(spec, '-')
|
||||
if dash < 0 {
|
||||
return 0, -1
|
||||
}
|
||||
startStr := strings.TrimSpace(spec[:dash])
|
||||
if startStr == "" {
|
||||
// Suffix range "bytes=-N" (last N bytes) is unsupported on a growing
|
||||
// source whose total isn't fixed — serve open-ended from 0 instead of
|
||||
// mis-reading N as an absolute end. fMP4 (moov at front) never needs it.
|
||||
return 0, -1
|
||||
}
|
||||
if v, err := strconv.ParseInt(startStr, 10, 64); err == nil && v >= 0 {
|
||||
start = v
|
||||
}
|
||||
if e := strings.TrimSpace(spec[dash+1:]); e != "" {
|
||||
if v, err := strconv.ParseInt(e, 10, 64); err == nil && v >= 0 {
|
||||
end = v
|
||||
}
|
||||
}
|
||||
return start, end
|
||||
}
|
||||
|
||||
// EstimatedProgress returns estimated watch progress percentage (0-100)
|
||||
// and the total duration in seconds (0 if unknown).
|
||||
func (ss *StreamServer) EstimatedProgress() (pct int, durationSec int) {
|
||||
|
|
|
|||
|
|
@ -125,7 +125,20 @@ func newTranscodeSource(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
estimate := estimateOutputSize(probe, opts)
|
||||
// Size estimate for the scrubber timeline. A copy remux (video not
|
||||
// re-encoded) lands within container overhead of the source file, so the
|
||||
// source size is a far better estimate than bitrate×duration — use it.
|
||||
// A real transcode re-encodes, so fall back to the bitrate×duration model.
|
||||
var estimate int64
|
||||
switch action {
|
||||
case ActionPassthrough, ActionRemux, ActionRemuxAudio:
|
||||
if fi, statErr := os.Stat(srcPath); statErr == nil {
|
||||
estimate = fi.Size()
|
||||
}
|
||||
}
|
||||
if estimate <= 0 {
|
||||
estimate = estimateOutputSize(probe, opts)
|
||||
}
|
||||
|
||||
t := &transcodeSource{
|
||||
tmpPath: tmpPath,
|
||||
|
|
@ -151,6 +164,18 @@ func newTranscodeSource(
|
|||
return t, nil
|
||||
}
|
||||
|
||||
// NewRemuxSource starts an ffmpeg `-c copy` remux of srcPath into a growing
|
||||
// fragmented-MP4 temp file and returns it as a GrowingSource for /stream
|
||||
// (hueco #3 / 3b). The video + audio are copied (never re-encoded), so this is
|
||||
// only valid when the codecs are already browser-native (h264 + aac) and only
|
||||
// the container needs changing — the web's decidePlayMethod enforces that
|
||||
// before sending PlayMethod="remux". The browser plays the result progressively
|
||||
// via byte-range. Caller MUST Close() it (kills ffmpeg + removes the temp file).
|
||||
func NewRemuxSource(ctx context.Context, srcPath string, probe *StreamProbe, ffmpegPath, displayName string) (GrowingSource, error) {
|
||||
opts := TranscodeOpts{Action: ActionRemux, FFmpegPath: ffmpegPath}
|
||||
return newTranscodeSource(ctx, srcPath, probe, ActionRemux, opts, displayName)
|
||||
}
|
||||
|
||||
// signalNotify wakes any goroutine blocked in ReadAt. Non-blocking: if a
|
||||
// notification is already pending the new event is folded into it (callers
|
||||
// always re-check size + final after waking, so a coalesced signal still
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue