feat(stream): bitrate-sized readahead for play-while-download
The torrent reader used a static 5 MiB readahead — about 1.9s of a 20 Mbps 4K stream — so streaming a torrent while it downloaded outran the download and stalled. anacrolix's reader already prioritises the pieces in the readahead window ahead of the playhead (and re-prioritises on seek); the window was just too small. dynamicReadahead sizes it to ~30s of video (clamped 8-96 MiB, 24 MiB default when bitrate is unknown). The torrent provider probes the bitrate asynchronously so stream start never blocks on ffprobe; readers created after the probe resolves pick up the accurate size. Real 4K (20.7 Mbps) -> 73 MiB.
This commit is contained in:
parent
e4373454ba
commit
9c995fc4dd
6 changed files with 110 additions and 6 deletions
32
internal/engine/readahead.go
Normal file
32
internal/engine/readahead.go
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
package engine
|
||||
|
||||
// Torrent stream readahead sizing.
|
||||
//
|
||||
// anacrolix's Reader (SetResponsive + SetReadahead) already prioritises the
|
||||
// pieces in a window ahead of the read position and re-prioritises on Seek —
|
||||
// so the playhead→piece-priority feedback is built in. The problem was the
|
||||
// window: a static 5 MiB is only ~1.6s of a 25 Mbps 4K stream, so playback
|
||||
// outran the download and stalled. Sizing the window by bitrate (~30s of video)
|
||||
// keeps a real buffer ahead of the playhead.
|
||||
const (
|
||||
readaheadSeconds = 30
|
||||
minReadahead = 8 << 20 // 8 MiB
|
||||
maxReadahead = 96 << 20 // 96 MiB — cap so a seek doesn't waste a huge fetch
|
||||
defaultReadahead = 24 << 20 // 24 MiB — when bitrate is unknown (still ~5x the old 5 MiB)
|
||||
)
|
||||
|
||||
// dynamicReadahead returns the bytes-ahead window for a torrent reader given the
|
||||
// stream's bitrate (bits/sec). Unknown/zero bitrate → a generous default.
|
||||
func dynamicReadahead(bitrateBps int64) int64 {
|
||||
if bitrateBps <= 0 {
|
||||
return defaultReadahead
|
||||
}
|
||||
ra := bitrateBps / 8 * readaheadSeconds
|
||||
if ra < minReadahead {
|
||||
return minReadahead
|
||||
}
|
||||
if ra > maxReadahead {
|
||||
return maxReadahead
|
||||
}
|
||||
return ra
|
||||
}
|
||||
43
internal/engine/readahead_test.go
Normal file
43
internal/engine/readahead_test.go
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
package engine
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestDynamicReadahead(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
bitrateBps int64
|
||||
want int64
|
||||
}{
|
||||
{"unknown bitrate → default", 0, defaultReadahead},
|
||||
{"negative → default", -1, defaultReadahead},
|
||||
{"low bitrate clamps to min", 1_000_000, minReadahead}, // 1 Mbps → ~3.75 MiB < 8 MiB
|
||||
{"mid bitrate scales", 5_000_000, 5_000_000 / 8 * readaheadSeconds}, // 5 Mbps → ~18.75 MiB
|
||||
{"high bitrate within range", 25_000_000, 25_000_000 / 8 * readaheadSeconds}, // 4K ~25 Mbps → ~93.75 MiB
|
||||
{"very high clamps to max", 80_000_000, maxReadahead}, // 80 Mbps → 300 MiB > cap
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
got := dynamicReadahead(c.bitrateBps)
|
||||
if got != c.want {
|
||||
t.Errorf("dynamicReadahead(%d) = %d, want %d", c.bitrateBps, got, c.want)
|
||||
}
|
||||
if got < minReadahead && c.bitrateBps > 0 {
|
||||
t.Errorf("result %d below min %d", got, minReadahead)
|
||||
}
|
||||
if got > maxReadahead {
|
||||
t.Errorf("result %d above max %d", got, maxReadahead)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDynamicReadahead_BeatsOldStatic(t *testing.T) {
|
||||
// The whole point: every result is bigger than the old static 5 MiB that
|
||||
// stalled HD/4K.
|
||||
const oldStatic = 5 * 1024 * 1024
|
||||
for _, b := range []int64{0, 1_000_000, 8_000_000, 25_000_000, 100_000_000} {
|
||||
if got := dynamicReadahead(b); got <= oldStatic {
|
||||
t.Errorf("dynamicReadahead(%d) = %d, not bigger than the old 5 MiB", b, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -235,7 +235,9 @@ func (s *StreamEngine) WaitBuffer(ctx context.Context, progressFn func(buffered,
|
|||
func (s *StreamEngine) NewFileReader(ctx context.Context) io.ReadSeekCloser {
|
||||
reader := s.file.NewReader()
|
||||
reader.SetResponsive()
|
||||
reader.SetReadahead(5 * 1024 * 1024) // 5MB readahead
|
||||
// Generous default window (vs the old static 5 MiB that stalled HD/4K). This
|
||||
// CLI path has no bitrate probe, so dynamicReadahead(0) returns the default.
|
||||
reader.SetReadahead(dynamicReadahead(0))
|
||||
reader.SetContext(ctx)
|
||||
return reader
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1129,19 +1129,37 @@ func (p *diskFileProvider) FileSize() int64 {
|
|||
}
|
||||
|
||||
// NewTorrentFileProvider creates a FileProvider from an active torrent file.
|
||||
func NewTorrentFileProvider(file *torrent.File) FileProvider {
|
||||
return &torrentFileProvider{file: file}
|
||||
// dataDir locates the on-disk file for a best-effort bitrate probe that sizes
|
||||
// the streaming readahead. The probe runs ASYNC so stream start never blocks on
|
||||
// ffprobe (a missing header would otherwise stall up to the probe timeout);
|
||||
// until it resolves, readers use the default window, and readers created after
|
||||
// it resolves pick up the accurate size.
|
||||
func NewTorrentFileProvider(file *torrent.File, dataDir string) FileProvider {
|
||||
p := &torrentFileProvider{file: file}
|
||||
if dataDir != "" {
|
||||
go func() {
|
||||
if bps := probeMediaInfo(filepath.Join(dataDir, file.DisplayPath())).bitrateBps; bps > 0 {
|
||||
p.bitrateBps.Store(bps)
|
||||
}
|
||||
}()
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// torrentFileProvider wraps a torrent.File to implement FileProvider.
|
||||
type torrentFileProvider struct {
|
||||
file *torrent.File
|
||||
// bitrateBps sizes the reader's readahead window (see dynamicReadahead).
|
||||
// Set asynchronously by the bitrate probe; 0 until then → default window.
|
||||
bitrateBps atomic.Int64
|
||||
}
|
||||
|
||||
func (p *torrentFileProvider) NewFileReader(ctx context.Context) io.ReadSeekCloser {
|
||||
reader := p.file.NewReader()
|
||||
reader.SetResponsive()
|
||||
reader.SetReadahead(5 * 1024 * 1024)
|
||||
// Bitrate-sized window (vs the old static 5 MiB that stalled HD/4K). anacrolix
|
||||
// prioritises pieces in this window ahead of the read position + on seek.
|
||||
reader.SetReadahead(dynamicReadahead(p.bitrateBps.Load()))
|
||||
reader.SetContext(ctx)
|
||||
return reader
|
||||
}
|
||||
|
|
|
|||
|
|
@ -642,7 +642,10 @@ func (d *TorrentDownloader) GetStreamProvider(taskID string) (FileProvider, erro
|
|||
return nil, fmt.Errorf("torrent has no files")
|
||||
}
|
||||
|
||||
return NewTorrentFileProvider(video), nil
|
||||
// The provider probes the bitrate asynchronously (to size the streaming
|
||||
// readahead) — passing DataDir lets it locate the on-disk file without
|
||||
// blocking stream start.
|
||||
return NewTorrentFileProvider(video, d.cfg.DataDir), nil
|
||||
}
|
||||
|
||||
// VideoExts is the canonical set of video file extensions used for file selection.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue