package engine import ( "context" "crypto/tls" "encoding/hex" "encoding/json" "fmt" "io" "log" "net" "net/http" "os" "os/exec" "path/filepath" "strconv" "strings" "sync" "sync/atomic" "syscall" "time" "github.com/anacrolix/torrent" ) // StreamURLs holds all available stream URLs keyed by network type. // Serialized as JSON into the stream_url DB field so the web API can // pick the best URL based on the browser's IP address. type StreamURLs struct { LAN string `json:"lan,omitempty"` Tailscale string `json:"ts,omitempty"` Public string `json:"pub,omitempty"` } // FileProvider abstracts where to get a file reader for streaming. type FileProvider interface { NewFileReader(ctx context.Context) io.ReadSeekCloser FileName() string FileSize() int64 } // GrowingSource is a /stream source whose bytes are produced over time by an // ffmpeg remux/transcode to a temp file (see transcodeSource). It is served // via manual Range handling (serveGrowing) instead of http.ServeContent, // which assumes a complete, fixed-size, seekable file. Used by direct-play's // remux path (hueco #3 / 3b): mkv h264/aac → progressive fMP4, no re-encode. type GrowingSource interface { // ReadAt blocks until off+len(p) bytes have been produced, the source is // final, or a timeout elapses; near the live edge it returns a short // (n>0, nil) read so the caller can stream what exists so far. ReadAt(p []byte, off int64) (int, error) Size() int64 // bytes produced so far Final() bool // ffmpeg exited — Size() is now the true total EstimatedSize() int64 // expected final size, for the scrubber timeline FileName() string Close() error } // StreamServer is a persistent HTTP server that serves one file at a time. // Start it once with Listen(), then swap files with SetFile()/ClearFile(). // The server stays alive for the entire daemon lifecycle — no port churn. type StreamServer struct { mu sync.RWMutex provider FileProvider growing GrowingSource // set instead of provider for the progressive-remux path (3b) taskID string // current task being streamed server *http.Server port int url string // best single URL (backward compat) urls StreamURLs // all available URLs by network type upnpMapping *UPnPMapping // TLS — optional HTTPS listener for direct, valid-cert browser playback // (agent-TLS feature). httpsPort 0 = disabled. tlsCert holds the current // server certificate, swapped atomically on renewal; the TLS config reads it // via GetCertificate so a renewed cert applies without dropping the listener. // HTTP (port) keeps serving regardless — loopback players + the funnel use it. httpsPort int httpsServer *http.Server tlsCert atomic.Pointer[tls.Certificate] // enableUPnP gates whether Listen() asks the gateway to publish the // stream port to the WAN. UPnP is opt-in (false by default) because // /stream and /hls have no auth — exposing them on the public internet // would let any scanner enumerate active downloads. LAN and Tailscale // access keep working without UPnP. enableUPnP bool // corsExtraOrigins are operator-configured origins added to the default // allowlist defined in validate.go. Set before Listen(). corsExtraOrigins []string // corsAllowlist is computed at Listen() time and treated as read-only // thereafter so per-request reads need no locking. corsAllowlist map[string]struct{} hls *HLSSessionRegistry // HLS sessions served on /hls//... // streamSecret signs the per-URL stream tokens (see stream_token.go). In // memory only; regenerated each daemon start. requireToken gates whether // remote (non-loopback) /stream and /hls requests must carry a valid token. streamSecret []byte requireToken bool // ffmpegPath is the resolved ffmpeg binary, used by /thumbnail to extract a // single frame on demand. Empty = thumbnails disabled (503). Set once before // Listen() via SetFFmpegPath; read-only thereafter so the handler needs no lock. ffmpegPath string lastActivity atomic.Int64 maxByteOffset atomic.Int64 // highest sequential read position (main playback connection) totalFileSize atomic.Int64 bitrateBps atomic.Int64 // video bitrate in bits/sec (from ffprobe, 0 = unknown) durationSec atomic.Int64 // video duration in seconds (from ffprobe, 0 = unknown) topReaderID atomic.Int64 // ID of the reader that set maxByteOffset (only it can advance it) readerCounter atomic.Int64 // monotonic counter for assigning reader IDs } // NewStreamServer creates a stream server bound to the given port. // Call Listen() to start accepting connections, then SetFile() to serve content. // // UPnP is opt-in: call SetUPnPEnabled(true) before Listen() to publish the // stream port on the WAN. Without it, only LAN and Tailscale clients can // reach the server. This matches the security default — /stream and /hls // have no auth, so exposing them to the public internet is something the // operator must explicitly request. func NewStreamServer(port int) *StreamServer { return &StreamServer{ port: port, hls: NewHLSSessionRegistry(), streamSecret: newStreamSecret(), requireToken: true, // secure by default; the agent self-mints tokens } } // StreamSecretHex returns the daemon's stream-token signing key as hex, so it // can be reported to the web (which mints the HLS path token the agent then // verifies). Treat as a secret — it lets the holder mint valid stream tokens. func (ss *StreamServer) StreamSecretHex() string { return hex.EncodeToString(ss.streamSecret) } // SetRequireStreamToken toggles remote stream-token enforcement. Loopback // callers are always exempt. Call before Listen() / before reporting URLs. // Default is true; an operator can disable it via config for debugging. func (ss *StreamServer) SetRequireStreamToken(require bool) { ss.requireToken = require } // checkStreamToken reports whether a request may proceed: always true when // enforcement is off; otherwise the token must be a valid signature for scope. // No loopback exemption — cloudflared relays public funnel traffic over // localhost, so loopback is not a trust signal. func (ss *StreamServer) checkStreamToken(scope, token string) bool { if !ss.requireToken { return true } return verifyStreamToken(ss.streamSecret, scope, token, time.Now()) } // SetUPnPEnabled toggles WAN publishing of the stream port. Call before // Listen(); changes after Listen() are ignored for the active server. func (ss *StreamServer) SetUPnPEnabled(enabled bool) { ss.enableUPnP = enabled } // EnableTLS arms the HTTPS listener on httpsPort. Call before Listen(). The // listener starts even without a certificate installed yet — handshakes fail // until one is set via SetTLSCertificate, so a cert issued asynchronously (the // future ACME broker) applies live without a restart. httpsPort <= 0 is a no-op. func (ss *StreamServer) EnableTLS(httpsPort int) { if httpsPort > 0 { ss.httpsPort = httpsPort } } // SetTLSCertificate atomically installs or replaces the server certificate used // by the HTTPS listener. Safe to call at any time (startup or on renewal); the // new cert applies to the next TLS handshake without dropping the listener. func (ss *StreamServer) SetTLSCertificate(cert *tls.Certificate) { ss.tlsCert.Store(cert) } // LoadTLSCertificateFromFiles reads a PEM cert+key pair from disk and installs // it. Returns an error if the pair is missing or invalid — the caller decides // whether that's fatal (the daemon treats it as "TLS off, HTTP keeps serving"). func (ss *StreamServer) LoadTLSCertificateFromFiles(certPath, keyPath string) error { cert, err := tls.LoadX509KeyPair(certPath, keyPath) if err != nil { return fmt.Errorf("load TLS keypair: %w", err) } ss.SetTLSCertificate(&cert) return nil } // HasTLSCertificate reports whether a server certificate is currently installed. func (ss *StreamServer) HasTLSCertificate() bool { return ss.tlsCert.Load() != nil } // HTTPSPort returns the active HTTPS port, or 0 when TLS is disabled. func (ss *StreamServer) HTTPSPort() int { return ss.httpsPort } // SetFFmpegPath sets the ffmpeg binary used by /thumbnail to extract single // frames on demand. Call before Listen(); empty leaves thumbnails disabled // (the handler returns 503). Read-only after Listen() — no locking in the handler. func (ss *StreamServer) SetFFmpegPath(path string) { ss.ffmpegPath = path } // SetCORSAllowedOrigins replaces the operator-supplied extra origins. The // default allowlist (torrentclaw.com / app.torrentclaw.com / localhost dev // ports) is always merged in. Call before Listen(). func (ss *StreamServer) SetCORSAllowedOrigins(origins []string) { ss.corsExtraOrigins = origins } // writeCORSHeaders writes the per-origin CORS response headers when the // request carries an Origin header that matches the allowlist. Returns true // if the handler must short-circuit (preflight OPTIONS). Media-tag requests // (no Origin header) bypass this entirely. // // `Vary: Origin` is emitted whenever an Origin header is present (matched // or not) so any intermediate cache keys the response per-origin and a // later request with a different origin cannot be served a stale ACAO. func (ss *StreamServer) writeCORSHeaders(w http.ResponseWriter, r *http.Request, expose string) (preflight bool) { origin := r.Header.Get("Origin") if origin == "" { return false } w.Header().Add("Vary", "Origin") if _, ok := ss.corsAllowlist[origin]; !ok { // Unknown origin — do not emit CORS headers so the browser blocks // the response. Still return without short-circuiting so a non-CORS // caller (e.g. curl) keeps working. return false } w.Header().Set("Access-Control-Allow-Origin", origin) w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Range") if expose != "" { w.Header().Set("Access-Control-Expose-Headers", expose) } if r.Method == http.MethodOptions { w.WriteHeader(http.StatusNoContent) return true } return false } // HLS returns the HLS session registry for this server. Daemon code uses it // to register a session when the backend asks for HLS playback. func (ss *StreamServer) HLS() *HLSSessionRegistry { return ss.hls } // Listen starts the HTTP server on the configured port. Call once at daemon startup. func (ss *StreamServer) Listen(ctx context.Context) error { // Freeze the CORS allowlist before the first request can land. After // this point the map is treated as read-only so handlers can probe it // without locking. ss.corsAllowlist = buildCORSAllowlist(ss.corsExtraOrigins) mux := http.NewServeMux() mux.HandleFunc("/stream", ss.handler) mux.HandleFunc("/health", ss.healthHandler) mux.HandleFunc("/playlist.m3u", ss.playlistHandler) mux.HandleFunc("/hls/", ss.hlsHandler) mux.HandleFunc("/thumbnail", ss.thumbnailHandler) // SO_REUSEADDR allows immediate rebind if the port is in TIME_WAIT (e.g. after agent restart) lc := net.ListenConfig{ Control: func(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { _ = setReuseAddr(fd) }) }, } // Try configured port; if busy, try next ports (heartbeat reports actual port to web) var listener net.Listener var listenErr error basePort := ss.port for attempt := 0; attempt < 10; attempt++ { addr := fmt.Sprintf("0.0.0.0:%d", ss.port) listener, listenErr = lc.Listen(ctx, "tcp", addr) if listenErr == nil { break } if !strings.Contains(listenErr.Error(), "address already in use") { return fmt.Errorf("stream server listen on %s: %w", addr, listenErr) } ss.port++ log.Printf("[stream] port %d in use, trying %d", ss.port-1, ss.port) } if listenErr != nil { return fmt.Errorf("stream server: all ports busy (%d-%d): %w", basePort, ss.port, listenErr) } if ss.port != basePort { log.Printf("[stream] using port %d (configured %d was busy)", ss.port, basePort) } ss.port = listener.Addr().(*net.TCPAddr).Port // Collect all reachable URLs by network type if lanIP := LanIP(); lanIP != "" { ss.urls.LAN = fmt.Sprintf("http://%s:%d/stream", lanIP, ss.port) } if tsIP := TailscaleIP(); tsIP != "" { ss.urls.Tailscale = fmt.Sprintf("http://%s:%d/stream", tsIP, ss.port) } if ss.enableUPnP { mapping, err := SetupUPnP(ss.port) if err != nil { log.Printf("[stream] UPnP setup failed: %v (only LAN/Tailscale clients will reach port %d)", err, ss.port) } else { ss.upnpMapping = mapping ss.urls.Public = fmt.Sprintf("http://%s:%d/stream", mapping.ExternalIP, mapping.ExternalPort) } } else { log.Printf("[stream] UPnP disabled — port %d not published to WAN (set downloads.enable_upnp = true to opt in)", ss.port) } // Best single URL for backward compat: Tailscale > LAN > Public > localhost switch { case ss.urls.Tailscale != "": ss.url = ss.urls.Tailscale case ss.urls.LAN != "": ss.url = ss.urls.LAN case ss.urls.Public != "": ss.url = ss.urls.Public default: ss.url = fmt.Sprintf("http://127.0.0.1:%d/stream", ss.port) ss.urls.LAN = ss.url } ss.server = &http.Server{ Handler: mux, ReadHeaderTimeout: 5 * time.Second, } go func() { if err := ss.server.Serve(listener); err != nil && err != http.ErrServerClosed { log.Printf("stream server error: %v", err) } }() log.Printf("[stream] server listening on port %d", ss.port) // Optional HTTPS listener (agent-TLS feature). Non-fatal: if it can't bind, // HTTP keeps serving so the funnel + LAN HTTP path are unaffected. if ss.httpsPort > 0 { if err := ss.listenTLS(ctx, mux); err != nil { log.Printf("[stream] HTTPS listener disabled: %v", err) ss.httpsPort = 0 } } return nil } // listenTLS starts the HTTPS listener on ss.httpsPort serving the same mux as // the HTTP server. The certificate is read per-handshake from the atomic holder // (tlsCert) so a renewed cert applies without restarting the listener; until a // cert is installed, handshakes fail cleanly (the HTTP path is unaffected). func (ss *StreamServer) listenTLS(ctx context.Context, mux http.Handler) error { lc := net.ListenConfig{ Control: func(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { _ = setReuseAddr(fd) }) }, } var listener net.Listener var err error basePort := ss.httpsPort for attempt := 0; attempt < 10; attempt++ { listener, err = lc.Listen(ctx, "tcp", fmt.Sprintf("0.0.0.0:%d", ss.httpsPort)) if err == nil { break } if !strings.Contains(err.Error(), "address already in use") { return fmt.Errorf("https listen on %d: %w", ss.httpsPort, err) } ss.httpsPort++ } if err != nil { return fmt.Errorf("https: all ports busy (%d-%d): %w", basePort, ss.httpsPort, err) } ss.httpsPort = listener.Addr().(*net.TCPAddr).Port tlsCfg := &tls.Config{ MinVersion: tls.VersionTLS12, NextProtos: []string{"h2", "http/1.1"}, GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) { if cert := ss.tlsCert.Load(); cert != nil { return cert, nil } return nil, fmt.Errorf("no TLS certificate installed") }, } ss.httpsServer = &http.Server{ Handler: mux, ReadHeaderTimeout: 5 * time.Second, TLSConfig: tlsCfg, } go func() { // Empty cert/key paths → ServeTLS uses TLSConfig.GetCertificate. if err := ss.httpsServer.ServeTLS(listener, "", ""); err != nil && err != http.ErrServerClosed { log.Printf("[stream] HTTPS server error: %v", err) } }() log.Printf("[stream] HTTPS listening on port %d (certificate installed: %v)", ss.httpsPort, ss.HasTLSCertificate()) return nil } // SetFile atomically swaps the file being served and resets progress tracking. func (ss *StreamServer) SetFile(provider FileProvider, taskID string) { ss.mu.Lock() prevGrowing := ss.growing ss.provider = provider ss.growing = nil // a raw-file provider supersedes any in-flight remux ss.taskID = taskID ss.mu.Unlock() if prevGrowing != nil { _ = prevGrowing.Close() // stop the orphan ffmpeg + drop its temp file } ss.totalFileSize.Store(provider.FileSize()) ss.lastActivity.Store(time.Now().UnixNano()) ss.maxByteOffset.Store(0) ss.topReaderID.Store(0) ss.bitrateBps.Store(0) ss.durationSec.Store(0) // Probe bitrate + duration synchronously so rate-limiting and duration // are available before the first HTTP request arrives. if dp, ok := provider.(*diskFileProvider); ok { pm := probeMediaInfo(dp.path) if pm.bitrateBps > 0 { ss.bitrateBps.Store(pm.bitrateBps) log.Printf("[stream] detected bitrate: %.1f Mbps → throttle at %.1f Mbps", float64(pm.bitrateBps)/1e6, float64(pm.bitrateBps)*2/1e6) } if pm.durationSec > 0 { ss.durationSec.Store(pm.durationSec) } } } // SetGrowingFile serves a progressive-remux source on /stream (hueco #3 / 3b): // ffmpeg `-c copy` mkv→fMP4 to a growing temp file, range-served via // serveGrowing. Supersedes any prior provider/growing source (single-viewer). func (ss *StreamServer) SetGrowingFile(src GrowingSource, taskID string) { ss.mu.Lock() prevGrowing := ss.growing ss.growing = src ss.provider = nil ss.taskID = taskID ss.mu.Unlock() if prevGrowing != nil { _ = prevGrowing.Close() } ss.totalFileSize.Store(src.EstimatedSize()) ss.lastActivity.Store(time.Now().UnixNano()) ss.maxByteOffset.Store(0) ss.topReaderID.Store(0) // Rate-limit + bitrate tracking are for raw-file playback; the remux pump // has its own pacing (ffmpeg copy is I/O-bound), so leave them at zero. ss.bitrateBps.Store(0) ss.durationSec.Store(0) } // ClearFile stops serving any file. Subsequent requests return 404. func (ss *StreamServer) ClearFile() { ss.mu.Lock() ss.provider = nil prevGrowing := ss.growing ss.growing = nil ss.taskID = "" ss.mu.Unlock() if prevGrowing != nil { _ = prevGrowing.Close() } ss.totalFileSize.Store(0) ss.maxByteOffset.Store(0) ss.topReaderID.Store(0) ss.bitrateBps.Store(0) ss.durationSec.Store(0) } // CurrentTaskID returns the task ID of the file currently being served. func (ss *StreamServer) CurrentTaskID() string { ss.mu.RLock() defer ss.mu.RUnlock() return ss.taskID } // HasFile returns true if a file (raw provider or growing remux) is being served. func (ss *StreamServer) HasFile() bool { ss.mu.RLock() defer ss.mu.RUnlock() return ss.provider != nil || ss.growing != nil } // URL returns the best single stream URL (backward compat). // URL returns the best single /stream URL, carrying a `?t=` token when // enforcement is on. This is what the one-shot `unarr stream` hands to the // player — and since the best URL is the Tailscale/LAN address (not loopback), // it must be tokenised or a remote-addressed player would be rejected. func (ss *StreamServer) URL() string { return ss.tokenizeStreamURL(ss.url) } // tokenizeStreamURL appends a freshly-minted `?t=` (scope "stream") to a // /stream URL. No-op when the URL is empty or enforcement is off. func (ss *StreamServer) tokenizeStreamURL(u string) string { if u == "" || !ss.requireToken { return u } sep := "?" if strings.Contains(u, "?") { sep = "&" } return u + sep + "t=" + mintStreamToken(ss.streamSecret, streamScopeStream, time.Now()) } // URLsJSON returns all available stream URLs as a JSON string, each carrying a // freshly-minted `?t=` stream token when enforcement is on. The web reports // these verbatim to the browser (pass-through), so the token reaches the // player without any web-side minting. func (ss *StreamServer) URLsJSON() string { b, _ := json.Marshal(ss.tokenizedStreamURLs()) return string(b) } // tokenizedStreamURLs appends a `?t=` (scope "stream") to each non-empty // /stream URL. No-op when enforcement is off. func (ss *StreamServer) tokenizedStreamURLs() StreamURLs { if !ss.requireToken { return ss.urls } return StreamURLs{ LAN: ss.tokenizeStreamURL(ss.urls.LAN), Tailscale: ss.tokenizeStreamURL(ss.urls.Tailscale), Public: ss.tokenizeStreamURL(ss.urls.Public), } } // Port returns the bound port. func (ss *StreamServer) Port() int { return ss.port } // IdleSince returns how long since the last HTTP request was received. func (ss *StreamServer) IdleSince() time.Duration { last := ss.lastActivity.Load() if last == 0 { return 0 } return time.Since(time.Unix(0, last)) } // Shutdown gracefully stops the HTTP server and removes the UPnP port mapping. // Call only at daemon shutdown — NOT between file swaps. func (ss *StreamServer) Shutdown(ctx context.Context) error { ss.upnpMapping.Remove() if ss.hls != nil { ss.hls.CloseAll() } if ss.httpsServer != nil { if err := ss.httpsServer.Shutdown(ctx); err != nil { log.Printf("[stream] HTTPS shutdown: %v", err) } } if ss.server != nil { return ss.server.Shutdown(ctx) } return nil } // hlsBaseURLs returns the per-network HLS base URLs for a given session. // The web client picks the first reachable one — same fallback strategy as // the legacy /stream URLs. func (ss *StreamServer) hlsBaseURLs(sessionID string) StreamURLs { // Token rides as a path segment so the playlists' relative child URIs // (video/index.m3u8, seg-N.m4s, subs/…) inherit it via relative resolution. base := "/hls/" + sessionID if ss.requireToken { base += "/" + mintStreamToken(ss.streamSecret, streamScopeHLS(sessionID), time.Now()) } var out StreamURLs if ss.urls.LAN != "" { out.LAN = strings.Replace(ss.urls.LAN, "/stream", base, 1) } if ss.urls.Tailscale != "" { out.Tailscale = strings.Replace(ss.urls.Tailscale, "/stream", base, 1) } if ss.urls.Public != "" { out.Public = strings.Replace(ss.urls.Public, "/stream", base, 1) } return out } // HLSURLsJSON returns base URLs for an HLS session as a JSON string for the // session response payload. func (ss *StreamServer) HLSURLsJSON(sessionID string) string { urls := ss.hlsBaseURLs(sessionID) b, _ := json.Marshal(urls) return string(b) } // hlsHandler routes /hls// to the matching HLSSession. // // Recognised resources: // // master.m3u8 — top-level playlist // video/index.m3u8 — video media playlist // video/init.mp4 — fMP4 init segment // video/seg-.m4s — video segment // subs/sub-.m3u8 — per-subtitle media playlist (synthesised) // subs/sub-.vtt — WebVTT subtitle (extracted by ffmpeg) func (ss *StreamServer) hlsHandler(w http.ResponseWriter, r *http.Request) { ss.lastActivity.Store(time.Now().UnixNano()) if ss.writeCORSHeaders(w, r, "Content-Length, Content-Range, Accept-Ranges") { return } rest := strings.TrimPrefix(r.URL.Path, "/hls/") parts := strings.SplitN(rest, "/", 2) if len(parts) == 0 || parts[0] == "" { http.Error(w, "missing session id", http.StatusNotFound) return } sessionID := parts[0] // Reject malformed IDs with the same 404 we return for unknown sessions — // no oracle for the accepted format. if !validSessionID.MatchString(sessionID) { http.Error(w, "hls session not found", http.StatusNotFound) return } remainder := "" if len(parts) > 1 { remainder = parts[1] } // Auth: when enforcement is on, the URL is /hls///. // Peel the token segment and verify it (no loopback exemption — funnel // traffic arrives over localhost). 404 on mismatch — same response as an // unknown session, no oracle. if ss.requireToken { sub := strings.SplitN(remainder, "/", 2) if !verifyStreamToken(ss.streamSecret, streamScopeHLS(sessionID), sub[0], time.Now()) { http.Error(w, "hls session not found", http.StatusNotFound) return } if len(sub) < 2 { http.Error(w, "missing resource", http.StatusNotFound) return } remainder = sub[1] } session := ss.hls.Get(sessionID) if session == nil { http.Error(w, "hls session not found", http.StatusNotFound) return } if remainder == "" { http.Error(w, "missing resource", http.StatusNotFound) return } resource := remainder switch { case resource == "master.m3u8": session.ServeMaster(w, r) case resource == "probe.json": w.Header().Set("Content-Type", "application/json") w.Header().Set("Cache-Control", "no-cache") _ = json.NewEncoder(w).Encode(session.ProbeInfo()) case resource == "video/index.m3u8": session.ServeVideoPlaylist(w, r) case resource == "video/init.mp4": session.ServeInit(w, r) case strings.HasPrefix(resource, "video/seg-") && strings.HasSuffix(resource, ".m4s"): idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "video/seg-"), ".m4s") idx, err := strconv.Atoi(idxStr) if err != nil { http.Error(w, "bad segment index", http.StatusBadRequest) return } session.ServeSegment(w, r, idx) case strings.HasPrefix(resource, "subs/sub-") && strings.HasSuffix(resource, ".m3u8"): idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "subs/sub-"), ".m3u8") idx, err := strconv.Atoi(idxStr) if err != nil { http.Error(w, "bad subtitle index", http.StatusBadRequest) return } ss.serveSubtitlePlaylist(w, r, session, idx) case strings.HasPrefix(resource, "subs/sub-") && strings.HasSuffix(resource, ".vtt"): idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "subs/sub-"), ".vtt") idx, err := strconv.Atoi(idxStr) if err != nil { http.Error(w, "bad subtitle index", http.StatusBadRequest) return } session.ServeSubtitle(w, r, idx) default: http.Error(w, "unknown hls resource", http.StatusNotFound) } } // serveSubtitlePlaylist generates a single-VTT-segment HLS playlist on the // fly so hls.js can consume it as a regular subtitle rendition. The VTT file // itself is extracted asynchronously by HLSSession.extractSubtitles. func (ss *StreamServer) serveSubtitlePlaylist(w http.ResponseWriter, r *http.Request, session *HLSSession, idx int) { if idx < 0 || idx >= len(session.probe.SubtitleTracks) { http.Error(w, "subtitle out of range", http.StatusNotFound) return } dur := session.durationSec if dur < 1 { dur = 1 } body := strings.Builder{} body.WriteString("#EXTM3U\n") body.WriteString("#EXT-X-VERSION:3\n") body.WriteString("#EXT-X-PLAYLIST-TYPE:VOD\n") body.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(dur)+1)) body.WriteString("#EXT-X-MEDIA-SEQUENCE:0\n") body.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n", dur)) body.WriteString(fmt.Sprintf("sub-%d.vtt\n", idx)) body.WriteString("#EXT-X-ENDLIST\n") w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") w.Header().Set("Cache-Control", "no-cache") _, _ = io.WriteString(w, body.String()) } // healthHandler responde con el estado del servidor en JSON. // Útil para diagnosticar conectividad desde redes remotas o Tailscale: // // curl http://:/health func (ss *StreamServer) healthHandler(w http.ResponseWriter, r *http.Request) { if ss.writeCORSHeaders(w, r, "") { return } ss.mu.RLock() provider := ss.provider taskID := ss.taskID ss.mu.RUnlock() clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) // Only expose filename/taskID/client to loopback callers (local diagnostics). // Remote callers (LAN, Tailscale, UPnP public) get a minimal probe response // so that scanners and unauthenticated peers cannot fingerprint the active // download. The web stream-probe only checks HTTP 200 + Content-Type. // // Use net.IP.IsLoopback so we also accept ::ffff:127.0.0.1 (Linux dual-stack // IPv4-mapped form) and reject the empty-string fallthrough when // SplitHostPort fails on a malformed RemoteAddr — both would otherwise // silently bypass the disclosure boundary. parsedIP := net.ParseIP(clientIP) isLocal := parsedIP != nil && parsedIP.IsLoopback() type healthResponse struct { Status string `json:"status"` Streaming bool `json:"streaming"` File string `json:"file,omitempty"` Task string `json:"task,omitempty"` Port int `json:"port"` Client string `json:"client,omitempty"` } resp := healthResponse{ Status: "ok", Port: ss.port, } if provider != nil { resp.Streaming = true } if isLocal { resp.Client = clientIP if provider != nil { resp.File = provider.FileName() resp.Task = taskID if len(resp.Task) > 8 { resp.Task = resp.Task[:8] } } } w.Header().Set("Content-Type", "application/json") w.Header().Set("Cache-Control", "no-cache") json.NewEncoder(w).Encode(resp) //nolint:errcheck } // playlistHandler generates an M3U playlist for VLC with #EXTVLCOPT language hints. // Query params: audioLangs (comma-sep), subLangs (comma-sep), resumeSec, title, streamUrl. // If streamUrl is omitted, uses the current best stream URL. // // VLC fetches this playlist and applies the EXTVLCOPT directives automatically, // enabling automatic audio/subtitle track selection on all VLC platforms (desktop + mobile). func (ss *StreamServer) playlistHandler(w http.ResponseWriter, r *http.Request) { if ss.writeCORSHeaders(w, r, "") { return } q := r.URL.Query() // Sanitize query params: strip CR/LF to prevent M3U directive injection. sanitize := func(s string) string { s = strings.ReplaceAll(s, "\n", "") s = strings.ReplaceAll(s, "\r", "") return s } audioLangs := sanitize(q.Get("audioLangs")) subLangs := sanitize(q.Get("subLangs")) resumeSec := sanitize(q.Get("resumeSec")) title := sanitize(q.Get("title")) streamURL := q.Get("streamUrl") // Only accept http(s) URLs to prevent file:// or other URI schemes in the playlist. if streamURL != "" && !strings.HasPrefix(streamURL, "http://") && !strings.HasPrefix(streamURL, "https://") { streamURL = "" } if streamURL == "" { // No self-minting fallback: returning a freshly-tokenised URL for a // param-less request would make /playlist.m3u an open token oracle // (any caller could fetch a valid /stream?t=… here). The web always // passes an already-tokenised streamUrl param; the playlist just echoes // it — the real auth gate is /stream itself. http.Error(w, "no active stream", http.StatusNotFound) return } if title == "" { title = "TorrentClaw Stream" } var b strings.Builder b.WriteString("#EXTM3U\n") b.WriteString(fmt.Sprintf("#EXTINF:-1,%s\n", title)) if audioLangs != "" { b.WriteString(fmt.Sprintf("#EXTVLCOPT:audio-language=%s\n", audioLangs)) } if subLangs != "" { b.WriteString(fmt.Sprintf("#EXTVLCOPT:sub-language=%s\n", subLangs)) } if resumeSec != "" && resumeSec != "0" { b.WriteString(fmt.Sprintf("#EXTVLCOPT:start-time=%s\n", resumeSec)) } b.WriteString("#EXTVLCOPT:network-caching=30000\n") b.WriteString(streamURL + "\n") w.Header().Set("Content-Type", "audio/x-mpegurl") w.Header().Set("Content-Disposition", `inline; filename="stream.m3u"`) w.Header().Set("Cache-Control", "no-cache") fmt.Fprint(w, b.String()) //nolint:errcheck } func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { ss.lastActivity.Store(time.Now().UnixNano()) // Log every incoming request — essential for diagnosing remote/Tailscale issues. clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) log.Printf("[stream] %s /stream from %s Range:%q", r.Method, clientIP, r.Header.Get("Range")) // Get current source (raw provider or growing remux; nil if none). ss.mu.RLock() provider := ss.provider growing := ss.growing ss.mu.RUnlock() if provider == nil && growing == nil { http.Error(w, "no active stream", http.StatusNotFound) return } if ss.writeCORSHeaders(w, r, "Content-Length, Content-Range, Accept-Ranges") { return } // Auth: every caller must carry a valid stream token. 404 (not 401/403) so // an unauthorised caller gets no oracle that a stream is active here. if !ss.checkStreamToken(streamScopeStream, r.URL.Query().Get("t")) { log.Printf("[stream] rejected %s — bad/absent token", clientIP) http.Error(w, "no active stream", http.StatusNotFound) return } // Progressive-remux path (3b): a growing fMP4 produced by ffmpeg `-c copy`. // Range-served manually because http.ServeContent needs a complete file. if growing != nil { ss.serveGrowing(w, r, growing) return } rawReader := provider.NewFileReader(r.Context()) if rawReader == nil { http.Error(w, "file not found", http.StatusNotFound) return } defer rawReader.Close() // Wrap reader to track bytes read for progress estimation + rate limit. // Rate limiting at ~2x bitrate ensures VLC can't download far ahead of // playback, so bytes-read ≈ playback position (like Netflix/YouTube). bps := ss.bitrateBps.Load() var bytesPerSec int64 if bps > 0 { bytesPerSec = bps / 8 * 2 // 2x bitrate in bytes/sec } var burstSize int64 if bytesPerSec > 0 { burstSize = bytesPerSec * 30 } reader := &trackingReader{ inner: rawReader, server: ss, id: ss.readerCounter.Add(1), bytesPerSec: bytesPerSec, burstSize: burstSize, tokens: burstSize, lastFill: time.Now(), } w.Header().Set("Content-Type", mimeTypeFromExt(provider.FileName())) // "inline" for play requests (VLC/mpv), "attachment" for download requests. disposition := "inline" if r.URL.Query().Get("download") == "1" { disposition = "attachment" } downloadName := provider.FileName() if disposition == "attachment" { ext := filepath.Ext(downloadName) downloadName = strings.TrimSuffix(downloadName, ext) + " [TorrentClaw]" + ext } w.Header().Set("Content-Disposition", fmt.Sprintf("%s; filename=%q", disposition, downloadName)) w.Header().Set("Accept-Ranges", "bytes") http.ServeContent(w, r, provider.FileName(), time.Time{}, reader) } // thumbnailHandler serves ONE JPEG frame decoded from a file at a timestamp. // It backs the web's "file characteristics" panel (frames on demand, hueco // medio): the panel renders a strip of at several positions, each hitting // this route. Independent of the active /stream — no session, no provider, no // effect on playback; ffmpeg just seeks the path and emits a single frame. // // Auth: a token scoped thumb: minted by the web with this agent's // stream secret. The path travels in ?p= (already client-visible — the library // UI shows it) and the token's scope binds that exact path, so a tampered p // fails verification. 404 (not 401/403) on a bad token — no oracle, same as // /stream. The path is additionally clamped to a real regular file as // defense-in-depth against a (trusted) web bug pointing ffmpeg at a device/FIFO. func (ss *StreamServer) thumbnailHandler(w http.ResponseWriter, r *http.Request) { ss.lastActivity.Store(time.Now().UnixNano()) if ss.writeCORSHeaders(w, r, "") { return } q := r.URL.Query() rawPath := q.Get("p") if rawPath == "" { http.Error(w, "missing path", http.StatusBadRequest) return } if !ss.checkStreamToken(streamScopeThumb(rawPath), q.Get("t")) { clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) log.Printf("[thumbnail] rejected from %s — bad/absent token", clientIP) http.Error(w, "not found", http.StatusNotFound) return } if fi, err := os.Stat(rawPath); err != nil || !fi.Mode().IsRegular() { http.Error(w, "not found", http.StatusNotFound) return } if ss.ffmpegPath == "" { http.Error(w, "thumbnails unavailable", http.StatusServiceUnavailable) return } pos := parseThumbPos(q.Get("pos")) width := parseThumbWidth(q.Get("w")) // Cap the work: a single keyframe decode is fast, but a corrupt/huge file or // a seek past EOF could hang ffmpeg. 20s is generous for a keyframe seek. ctx, cancel := context.WithTimeout(r.Context(), 20*time.Second) defer cancel() cmd := exec.CommandContext(ctx, ss.ffmpegPath, buildThumbnailArgs(rawPath, pos, width)...) var stderr strings.Builder cmd.Stderr = &stderr out, err := cmd.Output() if err != nil || len(out) == 0 { // A seek past EOF yields no frame — a benign empty output, not an error // worth alarming on. Log at most a short line for diagnosis. log.Printf("[thumbnail] no frame (pos=%.1f w=%d path=%q): err=%v %s", pos, width, rawPath, err, strings.TrimSpace(stderr.String())) http.Error(w, "thumbnail failed", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "image/jpeg") // path+pos is stable content; let the browser cache so re-opening the panel // doesn't re-run ffmpeg. private — it's a frame of the user's own file. w.Header().Set("Cache-Control", "private, max-age=3600") w.Header().Set("Content-Length", strconv.Itoa(len(out))) if _, err := w.Write(out); err != nil { log.Printf("[thumbnail] write failed: %v", err) } } // buildThumbnailArgs builds the ffmpeg argv that decodes ONE frame at posSec and // writes a scaled JPEG to stdout. `-ss` BEFORE `-i` does an input (keyframe) // seek — near-constant time regardless of position — instead of decoding from // the start. scale=w:-2 preserves aspect with an even height (mjpeg/yuv420 // requires even dimensions). `-an -sn` drops audio/subtitle streams. func buildThumbnailArgs(path string, posSec float64, width int) []string { return []string{ "-nostdin", "-loglevel", "error", "-ss", strconv.FormatFloat(posSec, 'f', 3, 64), "-i", path, "-frames:v", "1", "-vf", fmt.Sprintf("scale=%d:-2", width), "-an", "-sn", "-f", "mjpeg", "pipe:1", } } // parseThumbPos parses a non-negative seconds offset; defaults to 0 on garbage. func parseThumbPos(s string) float64 { if s == "" { return 0 } v, err := strconv.ParseFloat(s, 64) if err != nil || v < 0 { return 0 } return v } // parseThumbWidth parses the requested width, defaulting to 320 and clamping to // [80, 640] so a caller can't ask ffmpeg to upscale to an absurd size. func parseThumbWidth(s string) int { const def, min, max = 320, 80, 640 if s == "" { return def } v, err := strconv.Atoi(s) if err != nil { return def } if v < min { return min } if v > max { return max } return v } // serveGrowing range-serves a growing remux source (hueco #3 / 3b). Unlike // http.ServeContent it can't rely on a fixed file size: ffmpeg `-c copy` is // still writing, and the final byte count isn't known until it exits. So we: // // - advertise an ESTIMATED total (≈ source file size for a copy remux) in // Content-Range so the browser scrubber has a timeline; // - reply 206 and stream from the requested offset, blocking via ReadAt for // not-yet-produced bytes, until the explicit range end or the real EOF; // - send the body chunked (no Content-Length) for non-final sources, since // the true length differs from the estimate — promising an exact length we // can't fulfil would hang the browser. When the source is already final we // send an exact Content-Length. // // Seeking forward into a not-yet-remuxed region blocks briefly until the copy // (I/O-bound, fast) catches up; seeking back to produced bytes is immediate. func (ss *StreamServer) serveGrowing(w http.ResponseWriter, r *http.Request, src GrowingSource) { w.Header().Set("Accept-Ranges", "bytes") w.Header().Set("Content-Type", "video/mp4") w.Header().Set("Content-Disposition", fmt.Sprintf("inline; filename=%q", src.FileName())) // Total to advertise: exact when ffmpeg has exited, else the estimate. total := src.EstimatedSize() if src.Final() { total = src.Size() } if total <= 0 { total = src.Size() } start, explicitEnd := parseByteRange(r.Header.Get("Range")) if total > 0 && start >= total { // Range beyond what we expect to produce — let the browser recover. w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", total)) http.Error(w, "range not satisfiable", http.StatusRequestedRangeNotSatisfiable) return } if r.Method == http.MethodHead { if total > 0 { w.Header().Set("Content-Length", strconv.FormatInt(total, 10)) } w.WriteHeader(http.StatusOK) return } end := total - 1 if explicitEnd >= 0 && explicitEnd < end { end = explicitEnd } if total > 0 { w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, total)) } // Exact Content-Length only when the source is final (true size known) so // we never promise bytes a still-running remux might not produce. if src.Final() && explicitEnd < 0 { w.Header().Set("Content-Length", strconv.FormatInt(src.Size()-start, 10)) } w.WriteHeader(http.StatusPartialContent) buf := make([]byte, 256*1024) off := start firstRead := true for { if explicitEnd >= 0 && off > explicitEnd { return } if r.Context().Err() != nil { return // client disconnected / request cancelled } readStart := time.Now() n, err := src.ReadAt(buf, off) // TTFF diagnosis: a read that blocks means the client asked for bytes the // remux hasn't produced yet (a seek ahead of the live edge, or the very // first read before ffmpeg's init lands). Log it so a slow start is // attributable to "waiting on ffmpeg" vs network/decoder. if waited := time.Since(readStart); waited > 250*time.Millisecond { log.Printf("[stream] serveGrowing read off=%d blocked %v (produced=%d est=%d)", off, waited.Round(time.Millisecond), src.Size(), src.EstimatedSize()) } else if firstRead { log.Printf("[stream] serveGrowing start off=%d (produced=%d est=%d)", start, src.Size(), src.EstimatedSize()) } firstRead = false if n > 0 { toWrite := n if explicitEnd >= 0 { if remaining := explicitEnd - off + 1; int64(toWrite) > remaining { toWrite = int(remaining) } } if _, werr := w.Write(buf[:toWrite]); werr != nil { return // client gone } off += int64(toWrite) if f, ok := w.(http.Flusher); ok { f.Flush() } } if err != nil { // transcodeSource returns io.EOF only at the true (final) end; any // other error means ffmpeg failed or the read timed out. Either // way the stream is over — close the body. return } } } // parseByteRange parses a single "bytes=start-[end]" header into (start, end). // end is -1 when open-ended or absent. Multi-range and suffix ranges // ("bytes=-N") are not supported (returns start=0) — the browser falls back to // a normal open-ended request, which is all