Hueco #3 / 3c (CLI). NewRemuxSource now copies the video for any browser-decodable codec: h264, or HEVC/AV1 when the web says the device decodes them (caps). HEVC is muxed with -tag:v hvc1 (Apple requirement), and non-aac audio (ac3/eac3/dts) is transcoded to aac while the video is still copied (ActionRemuxAudio) — this covers the very common h264+ac3 mkv. Startup instrumentation for time-to-first-frame diagnosis: - remux branch logs [probe=.. spawn=..] - transcodeSource logs 'first fMP4 bytes after ..' (ffmpeg → first output) - serveGrowing logs reads that block >250ms (client seeking ahead of the live edge) + the first read's offset vs produced/estimated size. Verified: caps gate (hls without caps, remux with), hvc1 retag (ffprobe of the /stream output = hevc/hvc1), HEVC playback confirmed on a real iPhone Safari over Tailscale. LAN timeline: probe 16ms, spawn 1ms, first byte 201ms, no serveGrowing blocks.
1268 lines
41 KiB
Go
1268 lines
41 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/anacrolix/torrent"
|
|
)
|
|
|
|
// StreamURLs holds all available stream URLs keyed by network type.
|
|
// Serialized as JSON into the stream_url DB field so the web API can
|
|
// pick the best URL based on the browser's IP address.
|
|
type StreamURLs struct {
|
|
LAN string `json:"lan,omitempty"`
|
|
Tailscale string `json:"ts,omitempty"`
|
|
Public string `json:"pub,omitempty"`
|
|
}
|
|
|
|
// FileProvider abstracts where to get a file reader for streaming.
|
|
type FileProvider interface {
|
|
NewFileReader(ctx context.Context) io.ReadSeekCloser
|
|
FileName() string
|
|
FileSize() int64
|
|
}
|
|
|
|
// GrowingSource is a /stream source whose bytes are produced over time by an
|
|
// ffmpeg remux/transcode to a temp file (see transcodeSource). It is served
|
|
// via manual Range handling (serveGrowing) instead of http.ServeContent,
|
|
// which assumes a complete, fixed-size, seekable file. Used by direct-play's
|
|
// remux path (hueco #3 / 3b): mkv h264/aac → progressive fMP4, no re-encode.
|
|
type GrowingSource interface {
|
|
// ReadAt blocks until off+len(p) bytes have been produced, the source is
|
|
// final, or a timeout elapses; near the live edge it returns a short
|
|
// (n>0, nil) read so the caller can stream what exists so far.
|
|
ReadAt(p []byte, off int64) (int, error)
|
|
Size() int64 // bytes produced so far
|
|
Final() bool // ffmpeg exited — Size() is now the true total
|
|
EstimatedSize() int64 // expected final size, for the scrubber timeline
|
|
FileName() string
|
|
Close() error
|
|
}
|
|
|
|
// StreamServer is a persistent HTTP server that serves one file at a time.
|
|
// Start it once with Listen(), then swap files with SetFile()/ClearFile().
|
|
// The server stays alive for the entire daemon lifecycle — no port churn.
|
|
type StreamServer struct {
|
|
mu sync.RWMutex
|
|
provider FileProvider
|
|
growing GrowingSource // set instead of provider for the progressive-remux path (3b)
|
|
taskID string // current task being streamed
|
|
|
|
server *http.Server
|
|
port int
|
|
url string // best single URL (backward compat)
|
|
urls StreamURLs // all available URLs by network type
|
|
upnpMapping *UPnPMapping
|
|
// enableUPnP gates whether Listen() asks the gateway to publish the
|
|
// stream port to the WAN. UPnP is opt-in (false by default) because
|
|
// /stream and /hls have no auth — exposing them on the public internet
|
|
// would let any scanner enumerate active downloads. LAN and Tailscale
|
|
// access keep working without UPnP.
|
|
enableUPnP bool
|
|
// corsExtraOrigins are operator-configured origins added to the default
|
|
// allowlist defined in validate.go. Set before Listen().
|
|
corsExtraOrigins []string
|
|
// corsAllowlist is computed at Listen() time and treated as read-only
|
|
// thereafter so per-request reads need no locking.
|
|
corsAllowlist map[string]struct{}
|
|
|
|
hls *HLSSessionRegistry // HLS sessions served on /hls/<id>/...
|
|
|
|
// streamSecret signs the per-URL stream tokens (see stream_token.go). In
|
|
// memory only; regenerated each daemon start. requireToken gates whether
|
|
// remote (non-loopback) /stream and /hls requests must carry a valid token.
|
|
streamSecret []byte
|
|
requireToken bool
|
|
|
|
lastActivity atomic.Int64
|
|
maxByteOffset atomic.Int64 // highest sequential read position (main playback connection)
|
|
totalFileSize atomic.Int64
|
|
bitrateBps atomic.Int64 // video bitrate in bits/sec (from ffprobe, 0 = unknown)
|
|
durationSec atomic.Int64 // video duration in seconds (from ffprobe, 0 = unknown)
|
|
topReaderID atomic.Int64 // ID of the reader that set maxByteOffset (only it can advance it)
|
|
readerCounter atomic.Int64 // monotonic counter for assigning reader IDs
|
|
}
|
|
|
|
// NewStreamServer creates a stream server bound to the given port.
|
|
// Call Listen() to start accepting connections, then SetFile() to serve content.
|
|
//
|
|
// UPnP is opt-in: call SetUPnPEnabled(true) before Listen() to publish the
|
|
// stream port on the WAN. Without it, only LAN and Tailscale clients can
|
|
// reach the server. This matches the security default — /stream and /hls
|
|
// have no auth, so exposing them to the public internet is something the
|
|
// operator must explicitly request.
|
|
func NewStreamServer(port int) *StreamServer {
|
|
return &StreamServer{
|
|
port: port,
|
|
hls: NewHLSSessionRegistry(),
|
|
streamSecret: newStreamSecret(),
|
|
requireToken: true, // secure by default; the agent self-mints tokens
|
|
}
|
|
}
|
|
|
|
// StreamSecretHex returns the daemon's stream-token signing key as hex, so it
|
|
// can be reported to the web (which mints the HLS path token the agent then
|
|
// verifies). Treat as a secret — it lets the holder mint valid stream tokens.
|
|
func (ss *StreamServer) StreamSecretHex() string {
|
|
return hex.EncodeToString(ss.streamSecret)
|
|
}
|
|
|
|
// SetRequireStreamToken toggles remote stream-token enforcement. Loopback
|
|
// callers are always exempt. Call before Listen() / before reporting URLs.
|
|
// Default is true; an operator can disable it via config for debugging.
|
|
func (ss *StreamServer) SetRequireStreamToken(require bool) {
|
|
ss.requireToken = require
|
|
}
|
|
|
|
// checkStreamToken reports whether a request may proceed: always true when
|
|
// enforcement is off; otherwise the token must be a valid signature for scope.
|
|
// No loopback exemption — cloudflared relays public funnel traffic over
|
|
// localhost, so loopback is not a trust signal.
|
|
func (ss *StreamServer) checkStreamToken(scope, token string) bool {
|
|
if !ss.requireToken {
|
|
return true
|
|
}
|
|
return verifyStreamToken(ss.streamSecret, scope, token, time.Now())
|
|
}
|
|
|
|
// SetUPnPEnabled toggles WAN publishing of the stream port. Call before
|
|
// Listen(); changes after Listen() are ignored for the active server.
|
|
func (ss *StreamServer) SetUPnPEnabled(enabled bool) {
|
|
ss.enableUPnP = enabled
|
|
}
|
|
|
|
// SetCORSAllowedOrigins replaces the operator-supplied extra origins. The
|
|
// default allowlist (torrentclaw.com / app.torrentclaw.com / localhost dev
|
|
// ports) is always merged in. Call before Listen().
|
|
func (ss *StreamServer) SetCORSAllowedOrigins(origins []string) {
|
|
ss.corsExtraOrigins = origins
|
|
}
|
|
|
|
// writeCORSHeaders writes the per-origin CORS response headers when the
|
|
// request carries an Origin header that matches the allowlist. Returns true
|
|
// if the handler must short-circuit (preflight OPTIONS). Media-tag requests
|
|
// (no Origin header) bypass this entirely.
|
|
//
|
|
// `Vary: Origin` is emitted whenever an Origin header is present (matched
|
|
// or not) so any intermediate cache keys the response per-origin and a
|
|
// later request with a different origin cannot be served a stale ACAO.
|
|
func (ss *StreamServer) writeCORSHeaders(w http.ResponseWriter, r *http.Request, expose string) (preflight bool) {
|
|
origin := r.Header.Get("Origin")
|
|
if origin == "" {
|
|
return false
|
|
}
|
|
w.Header().Add("Vary", "Origin")
|
|
if _, ok := ss.corsAllowlist[origin]; !ok {
|
|
// Unknown origin — do not emit CORS headers so the browser blocks
|
|
// the response. Still return without short-circuiting so a non-CORS
|
|
// caller (e.g. curl) keeps working.
|
|
return false
|
|
}
|
|
w.Header().Set("Access-Control-Allow-Origin", origin)
|
|
w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, OPTIONS")
|
|
w.Header().Set("Access-Control-Allow-Headers", "Range")
|
|
if expose != "" {
|
|
w.Header().Set("Access-Control-Expose-Headers", expose)
|
|
}
|
|
if r.Method == http.MethodOptions {
|
|
w.WriteHeader(http.StatusNoContent)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// HLS returns the HLS session registry for this server. Daemon code uses it
|
|
// to register a session when the backend asks for HLS playback.
|
|
func (ss *StreamServer) HLS() *HLSSessionRegistry { return ss.hls }
|
|
|
|
// Listen starts the HTTP server on the configured port. Call once at daemon startup.
|
|
func (ss *StreamServer) Listen(ctx context.Context) error {
|
|
// Freeze the CORS allowlist before the first request can land. After
|
|
// this point the map is treated as read-only so handlers can probe it
|
|
// without locking.
|
|
ss.corsAllowlist = buildCORSAllowlist(ss.corsExtraOrigins)
|
|
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/stream", ss.handler)
|
|
mux.HandleFunc("/health", ss.healthHandler)
|
|
mux.HandleFunc("/playlist.m3u", ss.playlistHandler)
|
|
mux.HandleFunc("/hls/", ss.hlsHandler)
|
|
|
|
// SO_REUSEADDR allows immediate rebind if the port is in TIME_WAIT (e.g. after agent restart)
|
|
lc := net.ListenConfig{
|
|
Control: func(network, address string, c syscall.RawConn) error {
|
|
return c.Control(func(fd uintptr) {
|
|
_ = setReuseAddr(fd)
|
|
})
|
|
},
|
|
}
|
|
|
|
// Try configured port; if busy, try next ports (heartbeat reports actual port to web)
|
|
var listener net.Listener
|
|
var listenErr error
|
|
basePort := ss.port
|
|
for attempt := 0; attempt < 10; attempt++ {
|
|
addr := fmt.Sprintf("0.0.0.0:%d", ss.port)
|
|
listener, listenErr = lc.Listen(ctx, "tcp", addr)
|
|
if listenErr == nil {
|
|
break
|
|
}
|
|
if !strings.Contains(listenErr.Error(), "address already in use") {
|
|
return fmt.Errorf("stream server listen on %s: %w", addr, listenErr)
|
|
}
|
|
ss.port++
|
|
log.Printf("[stream] port %d in use, trying %d", ss.port-1, ss.port)
|
|
}
|
|
if listenErr != nil {
|
|
return fmt.Errorf("stream server: all ports busy (%d-%d): %w", basePort, ss.port, listenErr)
|
|
}
|
|
if ss.port != basePort {
|
|
log.Printf("[stream] using port %d (configured %d was busy)", ss.port, basePort)
|
|
}
|
|
|
|
ss.port = listener.Addr().(*net.TCPAddr).Port
|
|
|
|
// Collect all reachable URLs by network type
|
|
if lanIP := LanIP(); lanIP != "" {
|
|
ss.urls.LAN = fmt.Sprintf("http://%s:%d/stream", lanIP, ss.port)
|
|
}
|
|
if tsIP := TailscaleIP(); tsIP != "" {
|
|
ss.urls.Tailscale = fmt.Sprintf("http://%s:%d/stream", tsIP, ss.port)
|
|
}
|
|
if ss.enableUPnP {
|
|
mapping, err := SetupUPnP(ss.port)
|
|
if err != nil {
|
|
log.Printf("[stream] UPnP setup failed: %v (only LAN/Tailscale clients will reach port %d)", err, ss.port)
|
|
} else {
|
|
ss.upnpMapping = mapping
|
|
ss.urls.Public = fmt.Sprintf("http://%s:%d/stream", mapping.ExternalIP, mapping.ExternalPort)
|
|
}
|
|
} else {
|
|
log.Printf("[stream] UPnP disabled — port %d not published to WAN (set downloads.enable_upnp = true to opt in)", ss.port)
|
|
}
|
|
|
|
// Best single URL for backward compat: Tailscale > LAN > Public > localhost
|
|
switch {
|
|
case ss.urls.Tailscale != "":
|
|
ss.url = ss.urls.Tailscale
|
|
case ss.urls.LAN != "":
|
|
ss.url = ss.urls.LAN
|
|
case ss.urls.Public != "":
|
|
ss.url = ss.urls.Public
|
|
default:
|
|
ss.url = fmt.Sprintf("http://127.0.0.1:%d/stream", ss.port)
|
|
ss.urls.LAN = ss.url
|
|
}
|
|
|
|
ss.server = &http.Server{
|
|
Handler: mux,
|
|
ReadHeaderTimeout: 5 * time.Second,
|
|
}
|
|
|
|
go func() {
|
|
if err := ss.server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
|
log.Printf("stream server error: %v", err)
|
|
}
|
|
}()
|
|
|
|
log.Printf("[stream] server listening on port %d", ss.port)
|
|
return nil
|
|
}
|
|
|
|
// SetFile atomically swaps the file being served and resets progress tracking.
|
|
func (ss *StreamServer) SetFile(provider FileProvider, taskID string) {
|
|
ss.mu.Lock()
|
|
prevGrowing := ss.growing
|
|
ss.provider = provider
|
|
ss.growing = nil // a raw-file provider supersedes any in-flight remux
|
|
ss.taskID = taskID
|
|
ss.mu.Unlock()
|
|
if prevGrowing != nil {
|
|
_ = prevGrowing.Close() // stop the orphan ffmpeg + drop its temp file
|
|
}
|
|
ss.totalFileSize.Store(provider.FileSize())
|
|
ss.lastActivity.Store(time.Now().UnixNano())
|
|
ss.maxByteOffset.Store(0)
|
|
ss.topReaderID.Store(0)
|
|
ss.bitrateBps.Store(0)
|
|
ss.durationSec.Store(0)
|
|
|
|
// Probe bitrate + duration synchronously so rate-limiting and duration
|
|
// are available before the first HTTP request arrives.
|
|
if dp, ok := provider.(*diskFileProvider); ok {
|
|
pm := probeMediaInfo(dp.path)
|
|
if pm.bitrateBps > 0 {
|
|
ss.bitrateBps.Store(pm.bitrateBps)
|
|
log.Printf("[stream] detected bitrate: %.1f Mbps → throttle at %.1f Mbps",
|
|
float64(pm.bitrateBps)/1e6, float64(pm.bitrateBps)*2/1e6)
|
|
}
|
|
if pm.durationSec > 0 {
|
|
ss.durationSec.Store(pm.durationSec)
|
|
}
|
|
}
|
|
}
|
|
|
|
// SetGrowingFile serves a progressive-remux source on /stream (hueco #3 / 3b):
|
|
// ffmpeg `-c copy` mkv→fMP4 to a growing temp file, range-served via
|
|
// serveGrowing. Supersedes any prior provider/growing source (single-viewer).
|
|
func (ss *StreamServer) SetGrowingFile(src GrowingSource, taskID string) {
|
|
ss.mu.Lock()
|
|
prevGrowing := ss.growing
|
|
ss.growing = src
|
|
ss.provider = nil
|
|
ss.taskID = taskID
|
|
ss.mu.Unlock()
|
|
if prevGrowing != nil {
|
|
_ = prevGrowing.Close()
|
|
}
|
|
ss.totalFileSize.Store(src.EstimatedSize())
|
|
ss.lastActivity.Store(time.Now().UnixNano())
|
|
ss.maxByteOffset.Store(0)
|
|
ss.topReaderID.Store(0)
|
|
// Rate-limit + bitrate tracking are for raw-file playback; the remux pump
|
|
// has its own pacing (ffmpeg copy is I/O-bound), so leave them at zero.
|
|
ss.bitrateBps.Store(0)
|
|
ss.durationSec.Store(0)
|
|
}
|
|
|
|
// ClearFile stops serving any file. Subsequent requests return 404.
|
|
func (ss *StreamServer) ClearFile() {
|
|
ss.mu.Lock()
|
|
ss.provider = nil
|
|
prevGrowing := ss.growing
|
|
ss.growing = nil
|
|
ss.taskID = ""
|
|
ss.mu.Unlock()
|
|
if prevGrowing != nil {
|
|
_ = prevGrowing.Close()
|
|
}
|
|
ss.totalFileSize.Store(0)
|
|
ss.maxByteOffset.Store(0)
|
|
ss.topReaderID.Store(0)
|
|
ss.bitrateBps.Store(0)
|
|
ss.durationSec.Store(0)
|
|
}
|
|
|
|
// CurrentTaskID returns the task ID of the file currently being served.
|
|
func (ss *StreamServer) CurrentTaskID() string {
|
|
ss.mu.RLock()
|
|
defer ss.mu.RUnlock()
|
|
return ss.taskID
|
|
}
|
|
|
|
// HasFile returns true if a file (raw provider or growing remux) is being served.
|
|
func (ss *StreamServer) HasFile() bool {
|
|
ss.mu.RLock()
|
|
defer ss.mu.RUnlock()
|
|
return ss.provider != nil || ss.growing != nil
|
|
}
|
|
|
|
// URL returns the best single stream URL (backward compat).
|
|
// URL returns the best single /stream URL, carrying a `?t=` token when
|
|
// enforcement is on. This is what the one-shot `unarr stream` hands to the
|
|
// player — and since the best URL is the Tailscale/LAN address (not loopback),
|
|
// it must be tokenised or a remote-addressed player would be rejected.
|
|
func (ss *StreamServer) URL() string { return ss.tokenizeStreamURL(ss.url) }
|
|
|
|
// tokenizeStreamURL appends a freshly-minted `?t=<token>` (scope "stream") to a
|
|
// /stream URL. No-op when the URL is empty or enforcement is off.
|
|
func (ss *StreamServer) tokenizeStreamURL(u string) string {
|
|
if u == "" || !ss.requireToken {
|
|
return u
|
|
}
|
|
sep := "?"
|
|
if strings.Contains(u, "?") {
|
|
sep = "&"
|
|
}
|
|
return u + sep + "t=" + mintStreamToken(ss.streamSecret, streamScopeStream, time.Now())
|
|
}
|
|
|
|
// URLsJSON returns all available stream URLs as a JSON string, each carrying a
|
|
// freshly-minted `?t=` stream token when enforcement is on. The web reports
|
|
// these verbatim to the browser (pass-through), so the token reaches the
|
|
// player without any web-side minting.
|
|
func (ss *StreamServer) URLsJSON() string {
|
|
b, _ := json.Marshal(ss.tokenizedStreamURLs())
|
|
return string(b)
|
|
}
|
|
|
|
// tokenizedStreamURLs appends a `?t=<token>` (scope "stream") to each non-empty
|
|
// /stream URL. No-op when enforcement is off.
|
|
func (ss *StreamServer) tokenizedStreamURLs() StreamURLs {
|
|
if !ss.requireToken {
|
|
return ss.urls
|
|
}
|
|
return StreamURLs{
|
|
LAN: ss.tokenizeStreamURL(ss.urls.LAN),
|
|
Tailscale: ss.tokenizeStreamURL(ss.urls.Tailscale),
|
|
Public: ss.tokenizeStreamURL(ss.urls.Public),
|
|
}
|
|
}
|
|
|
|
// Port returns the bound port.
|
|
func (ss *StreamServer) Port() int { return ss.port }
|
|
|
|
// IdleSince returns how long since the last HTTP request was received.
|
|
func (ss *StreamServer) IdleSince() time.Duration {
|
|
last := ss.lastActivity.Load()
|
|
if last == 0 {
|
|
return 0
|
|
}
|
|
return time.Since(time.Unix(0, last))
|
|
}
|
|
|
|
// Shutdown gracefully stops the HTTP server and removes the UPnP port mapping.
|
|
// Call only at daemon shutdown — NOT between file swaps.
|
|
func (ss *StreamServer) Shutdown(ctx context.Context) error {
|
|
ss.upnpMapping.Remove()
|
|
if ss.hls != nil {
|
|
ss.hls.CloseAll()
|
|
}
|
|
if ss.server != nil {
|
|
return ss.server.Shutdown(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// hlsBaseURLs returns the per-network HLS base URLs for a given session.
|
|
// The web client picks the first reachable one — same fallback strategy as
|
|
// the legacy /stream URLs.
|
|
func (ss *StreamServer) hlsBaseURLs(sessionID string) StreamURLs {
|
|
// Token rides as a path segment so the playlists' relative child URIs
|
|
// (video/index.m3u8, seg-N.m4s, subs/…) inherit it via relative resolution.
|
|
base := "/hls/" + sessionID
|
|
if ss.requireToken {
|
|
base += "/" + mintStreamToken(ss.streamSecret, streamScopeHLS(sessionID), time.Now())
|
|
}
|
|
var out StreamURLs
|
|
if ss.urls.LAN != "" {
|
|
out.LAN = strings.Replace(ss.urls.LAN, "/stream", base, 1)
|
|
}
|
|
if ss.urls.Tailscale != "" {
|
|
out.Tailscale = strings.Replace(ss.urls.Tailscale, "/stream", base, 1)
|
|
}
|
|
if ss.urls.Public != "" {
|
|
out.Public = strings.Replace(ss.urls.Public, "/stream", base, 1)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// HLSURLsJSON returns base URLs for an HLS session as a JSON string for the
|
|
// session response payload.
|
|
func (ss *StreamServer) HLSURLsJSON(sessionID string) string {
|
|
urls := ss.hlsBaseURLs(sessionID)
|
|
b, _ := json.Marshal(urls)
|
|
return string(b)
|
|
}
|
|
|
|
// hlsHandler routes /hls/<sessionID>/<resource> to the matching HLSSession.
|
|
//
|
|
// Recognised resources:
|
|
//
|
|
// master.m3u8 — top-level playlist
|
|
// video/index.m3u8 — video media playlist
|
|
// video/init.mp4 — fMP4 init segment
|
|
// video/seg-<n>.m4s — video segment
|
|
// subs/sub-<n>.m3u8 — per-subtitle media playlist (synthesised)
|
|
// subs/sub-<n>.vtt — WebVTT subtitle (extracted by ffmpeg)
|
|
func (ss *StreamServer) hlsHandler(w http.ResponseWriter, r *http.Request) {
|
|
ss.lastActivity.Store(time.Now().UnixNano())
|
|
|
|
if ss.writeCORSHeaders(w, r, "Content-Length, Content-Range, Accept-Ranges") {
|
|
return
|
|
}
|
|
|
|
rest := strings.TrimPrefix(r.URL.Path, "/hls/")
|
|
parts := strings.SplitN(rest, "/", 2)
|
|
if len(parts) == 0 || parts[0] == "" {
|
|
http.Error(w, "missing session id", http.StatusNotFound)
|
|
return
|
|
}
|
|
sessionID := parts[0]
|
|
// Reject malformed IDs with the same 404 we return for unknown sessions —
|
|
// no oracle for the accepted format.
|
|
if !validSessionID.MatchString(sessionID) {
|
|
http.Error(w, "hls session not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
remainder := ""
|
|
if len(parts) > 1 {
|
|
remainder = parts[1]
|
|
}
|
|
// Auth: when enforcement is on, the URL is /hls/<sessionID>/<token>/<resource>.
|
|
// Peel the token segment and verify it (no loopback exemption — funnel
|
|
// traffic arrives over localhost). 404 on mismatch — same response as an
|
|
// unknown session, no oracle.
|
|
if ss.requireToken {
|
|
sub := strings.SplitN(remainder, "/", 2)
|
|
if !verifyStreamToken(ss.streamSecret, streamScopeHLS(sessionID), sub[0], time.Now()) {
|
|
http.Error(w, "hls session not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
if len(sub) < 2 {
|
|
http.Error(w, "missing resource", http.StatusNotFound)
|
|
return
|
|
}
|
|
remainder = sub[1]
|
|
}
|
|
session := ss.hls.Get(sessionID)
|
|
if session == nil {
|
|
http.Error(w, "hls session not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
if remainder == "" {
|
|
http.Error(w, "missing resource", http.StatusNotFound)
|
|
return
|
|
}
|
|
resource := remainder
|
|
|
|
switch {
|
|
case resource == "master.m3u8":
|
|
session.ServeMaster(w, r)
|
|
case resource == "probe.json":
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
_ = json.NewEncoder(w).Encode(session.ProbeInfo())
|
|
case resource == "video/index.m3u8":
|
|
session.ServeVideoPlaylist(w, r)
|
|
case resource == "video/init.mp4":
|
|
session.ServeInit(w, r)
|
|
case strings.HasPrefix(resource, "video/seg-") && strings.HasSuffix(resource, ".m4s"):
|
|
idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "video/seg-"), ".m4s")
|
|
idx, err := strconv.Atoi(idxStr)
|
|
if err != nil {
|
|
http.Error(w, "bad segment index", http.StatusBadRequest)
|
|
return
|
|
}
|
|
session.ServeSegment(w, r, idx)
|
|
case strings.HasPrefix(resource, "subs/sub-") && strings.HasSuffix(resource, ".m3u8"):
|
|
idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "subs/sub-"), ".m3u8")
|
|
idx, err := strconv.Atoi(idxStr)
|
|
if err != nil {
|
|
http.Error(w, "bad subtitle index", http.StatusBadRequest)
|
|
return
|
|
}
|
|
ss.serveSubtitlePlaylist(w, r, session, idx)
|
|
case strings.HasPrefix(resource, "subs/sub-") && strings.HasSuffix(resource, ".vtt"):
|
|
idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "subs/sub-"), ".vtt")
|
|
idx, err := strconv.Atoi(idxStr)
|
|
if err != nil {
|
|
http.Error(w, "bad subtitle index", http.StatusBadRequest)
|
|
return
|
|
}
|
|
session.ServeSubtitle(w, r, idx)
|
|
default:
|
|
http.Error(w, "unknown hls resource", http.StatusNotFound)
|
|
}
|
|
}
|
|
|
|
// serveSubtitlePlaylist generates a single-VTT-segment HLS playlist on the
|
|
// fly so hls.js can consume it as a regular subtitle rendition. The VTT file
|
|
// itself is extracted asynchronously by HLSSession.extractSubtitles.
|
|
func (ss *StreamServer) serveSubtitlePlaylist(w http.ResponseWriter, r *http.Request, session *HLSSession, idx int) {
|
|
if idx < 0 || idx >= len(session.probe.SubtitleTracks) {
|
|
http.Error(w, "subtitle out of range", http.StatusNotFound)
|
|
return
|
|
}
|
|
dur := session.durationSec
|
|
if dur < 1 {
|
|
dur = 1
|
|
}
|
|
body := strings.Builder{}
|
|
body.WriteString("#EXTM3U\n")
|
|
body.WriteString("#EXT-X-VERSION:3\n")
|
|
body.WriteString("#EXT-X-PLAYLIST-TYPE:VOD\n")
|
|
body.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(dur)+1))
|
|
body.WriteString("#EXT-X-MEDIA-SEQUENCE:0\n")
|
|
body.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n", dur))
|
|
body.WriteString(fmt.Sprintf("sub-%d.vtt\n", idx))
|
|
body.WriteString("#EXT-X-ENDLIST\n")
|
|
|
|
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
_, _ = io.WriteString(w, body.String())
|
|
}
|
|
|
|
// healthHandler responde con el estado del servidor en JSON.
|
|
// Útil para diagnosticar conectividad desde redes remotas o Tailscale:
|
|
//
|
|
// curl http://<tailscale-ip>:<port>/health
|
|
func (ss *StreamServer) healthHandler(w http.ResponseWriter, r *http.Request) {
|
|
if ss.writeCORSHeaders(w, r, "") {
|
|
return
|
|
}
|
|
ss.mu.RLock()
|
|
provider := ss.provider
|
|
taskID := ss.taskID
|
|
ss.mu.RUnlock()
|
|
|
|
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
|
|
// Only expose filename/taskID/client to loopback callers (local diagnostics).
|
|
// Remote callers (LAN, Tailscale, UPnP public) get a minimal probe response
|
|
// so that scanners and unauthenticated peers cannot fingerprint the active
|
|
// download. The web stream-probe only checks HTTP 200 + Content-Type.
|
|
//
|
|
// Use net.IP.IsLoopback so we also accept ::ffff:127.0.0.1 (Linux dual-stack
|
|
// IPv4-mapped form) and reject the empty-string fallthrough when
|
|
// SplitHostPort fails on a malformed RemoteAddr — both would otherwise
|
|
// silently bypass the disclosure boundary.
|
|
parsedIP := net.ParseIP(clientIP)
|
|
isLocal := parsedIP != nil && parsedIP.IsLoopback()
|
|
|
|
type healthResponse struct {
|
|
Status string `json:"status"`
|
|
Streaming bool `json:"streaming"`
|
|
File string `json:"file,omitempty"`
|
|
Task string `json:"task,omitempty"`
|
|
Port int `json:"port"`
|
|
Client string `json:"client,omitempty"`
|
|
}
|
|
resp := healthResponse{
|
|
Status: "ok",
|
|
Port: ss.port,
|
|
}
|
|
if provider != nil {
|
|
resp.Streaming = true
|
|
}
|
|
if isLocal {
|
|
resp.Client = clientIP
|
|
if provider != nil {
|
|
resp.File = provider.FileName()
|
|
resp.Task = taskID
|
|
if len(resp.Task) > 8 {
|
|
resp.Task = resp.Task[:8]
|
|
}
|
|
}
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
json.NewEncoder(w).Encode(resp) //nolint:errcheck
|
|
}
|
|
|
|
// playlistHandler generates an M3U playlist for VLC with #EXTVLCOPT language hints.
|
|
// Query params: audioLangs (comma-sep), subLangs (comma-sep), resumeSec, title, streamUrl.
|
|
// If streamUrl is omitted, uses the current best stream URL.
|
|
//
|
|
// VLC fetches this playlist and applies the EXTVLCOPT directives automatically,
|
|
// enabling automatic audio/subtitle track selection on all VLC platforms (desktop + mobile).
|
|
func (ss *StreamServer) playlistHandler(w http.ResponseWriter, r *http.Request) {
|
|
if ss.writeCORSHeaders(w, r, "") {
|
|
return
|
|
}
|
|
|
|
q := r.URL.Query()
|
|
|
|
// Sanitize query params: strip CR/LF to prevent M3U directive injection.
|
|
sanitize := func(s string) string {
|
|
s = strings.ReplaceAll(s, "\n", "")
|
|
s = strings.ReplaceAll(s, "\r", "")
|
|
return s
|
|
}
|
|
|
|
audioLangs := sanitize(q.Get("audioLangs"))
|
|
subLangs := sanitize(q.Get("subLangs"))
|
|
resumeSec := sanitize(q.Get("resumeSec"))
|
|
title := sanitize(q.Get("title"))
|
|
streamURL := q.Get("streamUrl")
|
|
// Only accept http(s) URLs to prevent file:// or other URI schemes in the playlist.
|
|
if streamURL != "" && !strings.HasPrefix(streamURL, "http://") && !strings.HasPrefix(streamURL, "https://") {
|
|
streamURL = ""
|
|
}
|
|
if streamURL == "" {
|
|
// No self-minting fallback: returning a freshly-tokenised URL for a
|
|
// param-less request would make /playlist.m3u an open token oracle
|
|
// (any caller could fetch a valid /stream?t=… here). The web always
|
|
// passes an already-tokenised streamUrl param; the playlist just echoes
|
|
// it — the real auth gate is /stream itself.
|
|
http.Error(w, "no active stream", http.StatusNotFound)
|
|
return
|
|
}
|
|
if title == "" {
|
|
title = "TorrentClaw Stream"
|
|
}
|
|
|
|
var b strings.Builder
|
|
b.WriteString("#EXTM3U\n")
|
|
b.WriteString(fmt.Sprintf("#EXTINF:-1,%s\n", title))
|
|
if audioLangs != "" {
|
|
b.WriteString(fmt.Sprintf("#EXTVLCOPT:audio-language=%s\n", audioLangs))
|
|
}
|
|
if subLangs != "" {
|
|
b.WriteString(fmt.Sprintf("#EXTVLCOPT:sub-language=%s\n", subLangs))
|
|
}
|
|
if resumeSec != "" && resumeSec != "0" {
|
|
b.WriteString(fmt.Sprintf("#EXTVLCOPT:start-time=%s\n", resumeSec))
|
|
}
|
|
b.WriteString("#EXTVLCOPT:network-caching=30000\n")
|
|
b.WriteString(streamURL + "\n")
|
|
|
|
w.Header().Set("Content-Type", "audio/x-mpegurl")
|
|
w.Header().Set("Content-Disposition", `inline; filename="stream.m3u"`)
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
fmt.Fprint(w, b.String()) //nolint:errcheck
|
|
}
|
|
|
|
func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) {
|
|
ss.lastActivity.Store(time.Now().UnixNano())
|
|
|
|
// Log every incoming request — essential for diagnosing remote/Tailscale issues.
|
|
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
|
|
log.Printf("[stream] %s /stream from %s Range:%q", r.Method, clientIP, r.Header.Get("Range"))
|
|
|
|
// Get current source (raw provider or growing remux; nil if none).
|
|
ss.mu.RLock()
|
|
provider := ss.provider
|
|
growing := ss.growing
|
|
ss.mu.RUnlock()
|
|
|
|
if provider == nil && growing == nil {
|
|
http.Error(w, "no active stream", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
if ss.writeCORSHeaders(w, r, "Content-Length, Content-Range, Accept-Ranges") {
|
|
return
|
|
}
|
|
|
|
// Auth: every caller must carry a valid stream token. 404 (not 401/403) so
|
|
// an unauthorised caller gets no oracle that a stream is active here.
|
|
if !ss.checkStreamToken(streamScopeStream, r.URL.Query().Get("t")) {
|
|
log.Printf("[stream] rejected %s — bad/absent token", clientIP)
|
|
http.Error(w, "no active stream", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// Progressive-remux path (3b): a growing fMP4 produced by ffmpeg `-c copy`.
|
|
// Range-served manually because http.ServeContent needs a complete file.
|
|
if growing != nil {
|
|
ss.serveGrowing(w, r, growing)
|
|
return
|
|
}
|
|
|
|
rawReader := provider.NewFileReader(r.Context())
|
|
if rawReader == nil {
|
|
http.Error(w, "file not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
defer rawReader.Close()
|
|
|
|
// Wrap reader to track bytes read for progress estimation + rate limit.
|
|
// Rate limiting at ~2x bitrate ensures VLC can't download far ahead of
|
|
// playback, so bytes-read ≈ playback position (like Netflix/YouTube).
|
|
bps := ss.bitrateBps.Load()
|
|
var bytesPerSec int64
|
|
if bps > 0 {
|
|
bytesPerSec = bps / 8 * 2 // 2x bitrate in bytes/sec
|
|
}
|
|
var burstSize int64
|
|
if bytesPerSec > 0 {
|
|
burstSize = bytesPerSec * 30
|
|
}
|
|
reader := &trackingReader{
|
|
inner: rawReader,
|
|
server: ss,
|
|
id: ss.readerCounter.Add(1),
|
|
bytesPerSec: bytesPerSec,
|
|
burstSize: burstSize,
|
|
tokens: burstSize,
|
|
lastFill: time.Now(),
|
|
}
|
|
|
|
w.Header().Set("Content-Type", mimeTypeFromExt(provider.FileName()))
|
|
// "inline" for play requests (VLC/mpv), "attachment" for download requests.
|
|
disposition := "inline"
|
|
if r.URL.Query().Get("download") == "1" {
|
|
disposition = "attachment"
|
|
}
|
|
downloadName := provider.FileName()
|
|
if disposition == "attachment" {
|
|
ext := filepath.Ext(downloadName)
|
|
downloadName = strings.TrimSuffix(downloadName, ext) + " [TorrentClaw]" + ext
|
|
}
|
|
w.Header().Set("Content-Disposition", fmt.Sprintf("%s; filename=%q", disposition, downloadName))
|
|
w.Header().Set("Accept-Ranges", "bytes")
|
|
|
|
http.ServeContent(w, r, provider.FileName(), time.Time{}, reader)
|
|
}
|
|
|
|
// serveGrowing range-serves a growing remux source (hueco #3 / 3b). Unlike
|
|
// http.ServeContent it can't rely on a fixed file size: ffmpeg `-c copy` is
|
|
// still writing, and the final byte count isn't known until it exits. So we:
|
|
//
|
|
// - advertise an ESTIMATED total (≈ source file size for a copy remux) in
|
|
// Content-Range so the browser scrubber has a timeline;
|
|
// - reply 206 and stream from the requested offset, blocking via ReadAt for
|
|
// not-yet-produced bytes, until the explicit range end or the real EOF;
|
|
// - send the body chunked (no Content-Length) for non-final sources, since
|
|
// the true length differs from the estimate — promising an exact length we
|
|
// can't fulfil would hang the browser. When the source is already final we
|
|
// send an exact Content-Length.
|
|
//
|
|
// Seeking forward into a not-yet-remuxed region blocks briefly until the copy
|
|
// (I/O-bound, fast) catches up; seeking back to produced bytes is immediate.
|
|
func (ss *StreamServer) serveGrowing(w http.ResponseWriter, r *http.Request, src GrowingSource) {
|
|
w.Header().Set("Accept-Ranges", "bytes")
|
|
w.Header().Set("Content-Type", "video/mp4")
|
|
w.Header().Set("Content-Disposition", fmt.Sprintf("inline; filename=%q", src.FileName()))
|
|
|
|
// Total to advertise: exact when ffmpeg has exited, else the estimate.
|
|
total := src.EstimatedSize()
|
|
if src.Final() {
|
|
total = src.Size()
|
|
}
|
|
if total <= 0 {
|
|
total = src.Size()
|
|
}
|
|
|
|
start, explicitEnd := parseByteRange(r.Header.Get("Range"))
|
|
if total > 0 && start >= total {
|
|
// Range beyond what we expect to produce — let the browser recover.
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", total))
|
|
http.Error(w, "range not satisfiable", http.StatusRequestedRangeNotSatisfiable)
|
|
return
|
|
}
|
|
|
|
if r.Method == http.MethodHead {
|
|
if total > 0 {
|
|
w.Header().Set("Content-Length", strconv.FormatInt(total, 10))
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
end := total - 1
|
|
if explicitEnd >= 0 && explicitEnd < end {
|
|
end = explicitEnd
|
|
}
|
|
if total > 0 {
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, total))
|
|
}
|
|
// Exact Content-Length only when the source is final (true size known) so
|
|
// we never promise bytes a still-running remux might not produce.
|
|
if src.Final() && explicitEnd < 0 {
|
|
w.Header().Set("Content-Length", strconv.FormatInt(src.Size()-start, 10))
|
|
}
|
|
w.WriteHeader(http.StatusPartialContent)
|
|
|
|
buf := make([]byte, 256*1024)
|
|
off := start
|
|
firstRead := true
|
|
for {
|
|
if explicitEnd >= 0 && off > explicitEnd {
|
|
return
|
|
}
|
|
if r.Context().Err() != nil {
|
|
return // client disconnected / request cancelled
|
|
}
|
|
readStart := time.Now()
|
|
n, err := src.ReadAt(buf, off)
|
|
// TTFF diagnosis: a read that blocks means the client asked for bytes the
|
|
// remux hasn't produced yet (a seek ahead of the live edge, or the very
|
|
// first read before ffmpeg's init lands). Log it so a slow start is
|
|
// attributable to "waiting on ffmpeg" vs network/decoder.
|
|
if waited := time.Since(readStart); waited > 250*time.Millisecond {
|
|
log.Printf("[stream] serveGrowing read off=%d blocked %v (produced=%d est=%d)",
|
|
off, waited.Round(time.Millisecond), src.Size(), src.EstimatedSize())
|
|
} else if firstRead {
|
|
log.Printf("[stream] serveGrowing start off=%d (produced=%d est=%d)", start, src.Size(), src.EstimatedSize())
|
|
}
|
|
firstRead = false
|
|
if n > 0 {
|
|
toWrite := n
|
|
if explicitEnd >= 0 {
|
|
if remaining := explicitEnd - off + 1; int64(toWrite) > remaining {
|
|
toWrite = int(remaining)
|
|
}
|
|
}
|
|
if _, werr := w.Write(buf[:toWrite]); werr != nil {
|
|
return // client gone
|
|
}
|
|
off += int64(toWrite)
|
|
if f, ok := w.(http.Flusher); ok {
|
|
f.Flush()
|
|
}
|
|
}
|
|
if err != nil {
|
|
// transcodeSource returns io.EOF only at the true (final) end; any
|
|
// other error means ffmpeg failed or the read timed out. Either
|
|
// way the stream is over — close the body.
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// parseByteRange parses a single "bytes=start-[end]" header into (start, end).
|
|
// end is -1 when open-ended or absent. Multi-range and suffix ranges
|
|
// ("bytes=-N") are not supported (returns start=0) — the browser falls back to
|
|
// a normal open-ended request, which is all <video> needs for a growing source.
|
|
func parseByteRange(header string) (start, end int64) {
|
|
end = -1
|
|
if !strings.HasPrefix(header, "bytes=") {
|
|
return 0, -1
|
|
}
|
|
spec := strings.TrimPrefix(header, "bytes=")
|
|
if i := strings.IndexByte(spec, ','); i >= 0 {
|
|
spec = spec[:i] // first range only
|
|
}
|
|
dash := strings.IndexByte(spec, '-')
|
|
if dash < 0 {
|
|
return 0, -1
|
|
}
|
|
startStr := strings.TrimSpace(spec[:dash])
|
|
if startStr == "" {
|
|
// Suffix range "bytes=-N" (last N bytes) is unsupported on a growing
|
|
// source whose total isn't fixed — serve open-ended from 0 instead of
|
|
// mis-reading N as an absolute end. fMP4 (moov at front) never needs it.
|
|
return 0, -1
|
|
}
|
|
if v, err := strconv.ParseInt(startStr, 10, 64); err == nil && v >= 0 {
|
|
start = v
|
|
}
|
|
if e := strings.TrimSpace(spec[dash+1:]); e != "" {
|
|
if v, err := strconv.ParseInt(e, 10, 64); err == nil && v >= 0 {
|
|
end = v
|
|
}
|
|
}
|
|
return start, end
|
|
}
|
|
|
|
// EstimatedProgress returns estimated watch progress percentage (0-100)
|
|
// and the total duration in seconds (0 if unknown).
|
|
func (ss *StreamServer) EstimatedProgress() (pct int, durationSec int) {
|
|
total := ss.totalFileSize.Load()
|
|
if total <= 0 {
|
|
return 0, 0
|
|
}
|
|
maxOffset := ss.maxByteOffset.Load()
|
|
p := int(float64(maxOffset) / float64(total) * 100)
|
|
if p > 100 {
|
|
p = 100
|
|
}
|
|
return p, int(ss.durationSec.Load())
|
|
}
|
|
|
|
// --- File Providers ---
|
|
|
|
// NewDiskFileProvider creates a FileProvider that serves a file from disk.
|
|
func NewDiskFileProvider(filePath string) FileProvider {
|
|
return &diskFileProvider{
|
|
path: filePath,
|
|
name: filepath.Base(filePath),
|
|
}
|
|
}
|
|
|
|
// diskFileProvider serves a file from disk.
|
|
type diskFileProvider struct {
|
|
path string
|
|
name string
|
|
}
|
|
|
|
func (p *diskFileProvider) NewFileReader(_ context.Context) io.ReadSeekCloser {
|
|
f, err := os.Open(p.path)
|
|
if err != nil {
|
|
log.Printf("[stream] failed to open %q: %v", p.path, err)
|
|
return nil
|
|
}
|
|
return f
|
|
}
|
|
|
|
func (p *diskFileProvider) FileName() string { return p.name }
|
|
|
|
func (p *diskFileProvider) FileSize() int64 {
|
|
fi, err := os.Stat(p.path)
|
|
if err != nil {
|
|
log.Printf("[stream] failed to stat %q: %v", p.path, err)
|
|
return 0
|
|
}
|
|
return fi.Size()
|
|
}
|
|
|
|
// NewTorrentFileProvider creates a FileProvider from an active torrent file.
|
|
func NewTorrentFileProvider(file *torrent.File) FileProvider {
|
|
return &torrentFileProvider{file: file}
|
|
}
|
|
|
|
// torrentFileProvider wraps a torrent.File to implement FileProvider.
|
|
type torrentFileProvider struct {
|
|
file *torrent.File
|
|
}
|
|
|
|
func (p *torrentFileProvider) NewFileReader(ctx context.Context) io.ReadSeekCloser {
|
|
reader := p.file.NewReader()
|
|
reader.SetResponsive()
|
|
reader.SetReadahead(5 * 1024 * 1024)
|
|
reader.SetContext(ctx)
|
|
return reader
|
|
}
|
|
|
|
func (p *torrentFileProvider) FileName() string {
|
|
return filepath.Base(p.file.DisplayPath())
|
|
}
|
|
|
|
func (p *torrentFileProvider) FileSize() int64 {
|
|
return p.file.Length()
|
|
}
|
|
|
|
// --- Utility functions ---
|
|
|
|
// FindVideoFile scans a directory (recursively) for the largest video file.
|
|
func FindVideoFile(dir string) string {
|
|
var best string
|
|
var bestSize int64
|
|
|
|
filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
|
|
if err != nil || d.IsDir() {
|
|
return nil
|
|
}
|
|
ext := strings.ToLower(filepath.Ext(d.Name()))
|
|
if !VideoExts[ext] {
|
|
return nil
|
|
}
|
|
info, err := d.Info()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if info.Size() > bestSize {
|
|
best = path
|
|
bestSize = info.Size()
|
|
}
|
|
return nil
|
|
})
|
|
return best
|
|
}
|
|
|
|
// LanIP returns the machine's LAN IP, or "" if unavailable.
|
|
func LanIP() string {
|
|
conn, err := net.Dial("udp", "8.8.8.8:80")
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
defer conn.Close()
|
|
return conn.LocalAddr().(*net.UDPAddr).IP.String()
|
|
}
|
|
|
|
// TailscaleIP returns the Tailscale IPv4 address, or "" if Tailscale isn't running.
|
|
func TailscaleIP() string {
|
|
out, err := exec.Command("tailscale", "ip", "-4").Output()
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
ip := strings.TrimSpace(string(out))
|
|
if net.ParseIP(ip) == nil {
|
|
return ""
|
|
}
|
|
return ip
|
|
}
|
|
|
|
// trackingReader wraps an io.ReadSeekCloser with:
|
|
// - Progress tracking: atomically updates maxByteOffset on Read (not Seek).
|
|
// - Rate limiting: token bucket throttle at ~2x video bitrate so that
|
|
// bytes-read ≈ playback position. Without this, local/NAS files get
|
|
// downloaded instantly and progress jumps to 100%.
|
|
//
|
|
// Rate limiting happens AFTER each Read (sleep to pace), never before.
|
|
// This ensures the client always receives data and never times out.
|
|
type trackingReader struct {
|
|
inner io.ReadSeekCloser
|
|
server *StreamServer
|
|
id int64 // unique ID for this reader
|
|
pos int64 // current read position
|
|
bytesRead int64 // total bytes read by THIS connection (measures sequential progress)
|
|
bytesPerSec int64 // 0 = unlimited (remote/torrent), >0 = throttled (local disk)
|
|
|
|
// Token bucket state
|
|
tokens int64 // available bytes to serve (can go negative = we're ahead)
|
|
lastFill time.Time // last time tokens were replenished
|
|
burstSize int64 // max token accumulation (caps how far ahead VLC can buffer)
|
|
}
|
|
|
|
func (t *trackingReader) Read(p []byte) (int, error) {
|
|
// Always read immediately — never block before serving data to the client.
|
|
n, err := t.inner.Read(p)
|
|
if n > 0 {
|
|
t.pos += int64(n)
|
|
t.bytesRead += int64(n)
|
|
|
|
// Only the reader that has read the most bytes can update progress.
|
|
// This prevents VLC's metadata/index requests (which read near EOF)
|
|
// from inflating progress to 100%.
|
|
if t.server.topReaderID.Load() == t.id {
|
|
// We own the progress — advance it (never regress)
|
|
for {
|
|
cur := t.server.maxByteOffset.Load()
|
|
if t.pos <= cur || t.server.maxByteOffset.CompareAndSwap(cur, t.pos) {
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
// Try to take over if we've read more than the current progress.
|
|
// CAS loop prevents two goroutines from interleaving their stores.
|
|
for {
|
|
cur := t.server.maxByteOffset.Load()
|
|
if t.bytesRead <= cur {
|
|
break
|
|
}
|
|
if t.server.maxByteOffset.CompareAndSwap(cur, t.pos) {
|
|
t.server.topReaderID.Store(t.id)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Rate limit: sleep AFTER read to pace throughput.
|
|
if t.bytesPerSec > 0 {
|
|
t.fillTokens()
|
|
t.tokens -= int64(n)
|
|
if t.tokens < 0 {
|
|
deficit := -t.tokens
|
|
sleepNs := (deficit * int64(time.Second)) / t.bytesPerSec
|
|
if sleepNs > int64(time.Second) {
|
|
sleepNs = int64(time.Second)
|
|
}
|
|
time.Sleep(time.Duration(sleepNs))
|
|
}
|
|
}
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (t *trackingReader) Seek(offset int64, whence int) (int64, error) {
|
|
newPos, err := t.inner.Seek(offset, whence)
|
|
if err == nil {
|
|
t.pos = newPos
|
|
// Don't update maxByteOffset on Seek — http.ServeContent seeks to EOF
|
|
// to determine size, which would instantly mark progress as 100%.
|
|
// Don't reset tokens — prevents clients from bypassing rate limiting
|
|
// by issuing repeated seeks to refill the token bucket.
|
|
}
|
|
return newPos, err
|
|
}
|
|
|
|
func (t *trackingReader) Close() error { return t.inner.Close() }
|
|
|
|
func (t *trackingReader) fillTokens() {
|
|
now := time.Now()
|
|
elapsed := now.Sub(t.lastFill)
|
|
if elapsed <= 0 {
|
|
return
|
|
}
|
|
newTokens := int64(elapsed.Seconds() * float64(t.bytesPerSec))
|
|
t.tokens += newTokens
|
|
if t.tokens > t.burstSize {
|
|
t.tokens = t.burstSize
|
|
}
|
|
t.lastFill = now
|
|
}
|
|
|
|
// probeMedia holds bitrate and duration extracted by ffprobe.
|
|
type probeMedia struct {
|
|
bitrateBps int64 // bits per second
|
|
durationSec int64 // seconds
|
|
}
|
|
|
|
// probeBitrate uses ffprobe to detect the video bitrate and duration.
|
|
// Returns zero values if ffprobe is not available or the file can't be probed.
|
|
func probeMediaInfo(filePath string) probeMedia {
|
|
// Defense-in-depth: only probe regular files (not FIFOs, devices, etc.)
|
|
if fi, err := os.Stat(filePath); err != nil || !fi.Mode().IsRegular() {
|
|
return probeMedia{}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
|
|
out, err := exec.CommandContext(ctx, "ffprobe",
|
|
"-v", "quiet",
|
|
"-print_format", "json",
|
|
"-show_format",
|
|
filePath,
|
|
).Output()
|
|
if err != nil {
|
|
return probeMedia{}
|
|
}
|
|
|
|
var result struct {
|
|
Format struct {
|
|
BitRate string `json:"bit_rate"`
|
|
Duration string `json:"duration"`
|
|
Size string `json:"size"`
|
|
} `json:"format"`
|
|
}
|
|
if err := json.Unmarshal(out, &result); err != nil {
|
|
return probeMedia{}
|
|
}
|
|
|
|
var pm probeMedia
|
|
|
|
// Parse duration
|
|
if result.Format.Duration != "" {
|
|
dur, _ := strconv.ParseFloat(result.Format.Duration, 64)
|
|
if dur > 0 {
|
|
pm.durationSec = int64(dur)
|
|
}
|
|
}
|
|
|
|
// Prefer explicit bit_rate from ffprobe
|
|
if result.Format.BitRate != "" {
|
|
bps, _ := strconv.ParseInt(result.Format.BitRate, 10, 64)
|
|
if bps > 0 {
|
|
pm.bitrateBps = bps
|
|
return pm
|
|
}
|
|
}
|
|
|
|
// Fallback: estimate bitrate from size / duration
|
|
if result.Format.Size != "" && pm.durationSec > 0 {
|
|
size, _ := strconv.ParseInt(result.Format.Size, 10, 64)
|
|
if size > 0 {
|
|
pm.bitrateBps = int64(float64(size) * 8 / float64(pm.durationSec))
|
|
}
|
|
}
|
|
|
|
return pm
|
|
}
|
|
|
|
func mimeTypeFromExt(filename string) string {
|
|
ext := strings.ToLower(filepath.Ext(filename))
|
|
switch ext {
|
|
case ".mp4", ".m4v":
|
|
return "video/mp4"
|
|
case ".mkv":
|
|
return "video/x-matroska"
|
|
case ".avi":
|
|
return "video/x-msvideo"
|
|
case ".webm":
|
|
return "video/webm"
|
|
case ".mov":
|
|
return "video/quicktime"
|
|
case ".ts":
|
|
return "video/mp2t"
|
|
case ".flv":
|
|
return "video/x-flv"
|
|
case ".mpg", ".mpeg":
|
|
return "video/mpeg"
|
|
case ".wmv":
|
|
return "video/x-ms-wmv"
|
|
case ".vob":
|
|
return "video/x-ms-vob"
|
|
default:
|
|
return "application/octet-stream"
|
|
}
|
|
}
|