2026-05-27 10:09:42 +02:00
|
|
|
package engine
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"os"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// probeCacheTTL is how long a cached probe stays usable. The cache key
|
|
|
|
|
// already incorporates mtime + size, so the TTL is a defense against
|
|
|
|
|
// runaway memory growth from stale paths, not a freshness guarantee — a
|
|
|
|
|
// rename + recreate at the same inode (rare) would still be caught by the
|
|
|
|
|
// mtime delta.
|
|
|
|
|
const probeCacheTTL = 30 * time.Minute
|
|
|
|
|
|
2026-05-27 11:15:44 +02:00
|
|
|
// probeCacheJanitorInterval is how often the background sweeper wakes to
|
|
|
|
|
// drop expired entries. Lookup-time eviction handles hot paths, but a
|
|
|
|
|
// user who browses 5k files and then stops would leak entries until each
|
|
|
|
|
// is individually re-touched. 5 min ≈ 6 sweeps per TTL window — enough
|
|
|
|
|
// to keep memory bounded without burning CPU.
|
|
|
|
|
const probeCacheJanitorInterval = 5 * time.Minute
|
|
|
|
|
|
2026-05-27 10:09:42 +02:00
|
|
|
type probeCacheEntry struct {
|
|
|
|
|
probe *StreamProbe
|
|
|
|
|
expires time.Time
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type probeCacheKey struct {
|
|
|
|
|
path string
|
|
|
|
|
mtime int64 // ModTime().UnixNano()
|
|
|
|
|
size int64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
2026-05-27 11:15:44 +02:00
|
|
|
probeCacheMu sync.RWMutex
|
|
|
|
|
probeCache = make(map[probeCacheKey]probeCacheEntry)
|
|
|
|
|
probeCacheJanitor sync.Once
|
2026-05-27 10:09:42 +02:00
|
|
|
)
|
|
|
|
|
|
2026-05-27 11:15:44 +02:00
|
|
|
// startProbeCacheJanitor launches the background sweeper exactly once per
|
|
|
|
|
// process. Lazy — fired on first storeProbeCache. Drops expired entries
|
|
|
|
|
// every probeCacheJanitorInterval. Idempotent (sync.Once).
|
|
|
|
|
func startProbeCacheJanitor() {
|
|
|
|
|
probeCacheJanitor.Do(func() {
|
|
|
|
|
go func() {
|
|
|
|
|
ticker := time.NewTicker(probeCacheJanitorInterval)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
for range ticker.C {
|
|
|
|
|
sweepProbeCache(time.Now())
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sweepProbeCache removes every entry whose expiry is at or before `now`.
|
|
|
|
|
// Exposed for tests; production code calls it indirectly via the janitor
|
|
|
|
|
// goroutine.
|
|
|
|
|
func sweepProbeCache(now time.Time) int {
|
|
|
|
|
probeCacheMu.Lock()
|
|
|
|
|
defer probeCacheMu.Unlock()
|
|
|
|
|
removed := 0
|
|
|
|
|
for k, e := range probeCache {
|
|
|
|
|
if !now.Before(e.expires) {
|
|
|
|
|
delete(probeCache, k)
|
|
|
|
|
removed++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return removed
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-27 10:09:42 +02:00
|
|
|
// lookupProbeCache returns the cached StreamProbe for the given path if its
|
|
|
|
|
// mtime + size still match the value recorded at insert time, AND the cache
|
|
|
|
|
// entry hasn't expired. Any stat failure / mismatch returns (nil, false) so
|
|
|
|
|
// the caller falls through to a fresh ffprobe run.
|
|
|
|
|
func lookupProbeCache(path string) (*StreamProbe, bool) {
|
|
|
|
|
fi, err := os.Stat(path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
key := probeCacheKey{
|
|
|
|
|
path: path,
|
|
|
|
|
mtime: fi.ModTime().UnixNano(),
|
|
|
|
|
size: fi.Size(),
|
|
|
|
|
}
|
|
|
|
|
probeCacheMu.RLock()
|
|
|
|
|
entry, ok := probeCache[key]
|
|
|
|
|
probeCacheMu.RUnlock()
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
if time.Now().After(entry.expires) {
|
2026-05-27 11:15:44 +02:00
|
|
|
// Re-check under the write lock so a concurrent re-insert (same key,
|
|
|
|
|
// fresh expiry) isn't accidentally evicted.
|
2026-05-27 10:09:42 +02:00
|
|
|
probeCacheMu.Lock()
|
2026-05-27 11:15:44 +02:00
|
|
|
if cur, stillThere := probeCache[key]; stillThere && time.Now().After(cur.expires) {
|
|
|
|
|
delete(probeCache, key)
|
|
|
|
|
}
|
2026-05-27 10:09:42 +02:00
|
|
|
probeCacheMu.Unlock()
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
return entry.probe, true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// storeProbeCache stashes a fresh probe result under the (path, mtime, size)
|
|
|
|
|
// key. A subsequent ffprobe-skipping HIT requires the file to still have the
|
|
|
|
|
// same mtime + size — anything else (re-encoded, renamed+recreated at the
|
|
|
|
|
// same path, truncated) misses and triggers a re-probe.
|
|
|
|
|
func storeProbeCache(path string, probe *StreamProbe) {
|
|
|
|
|
fi, err := os.Stat(path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
key := probeCacheKey{
|
|
|
|
|
path: path,
|
|
|
|
|
mtime: fi.ModTime().UnixNano(),
|
|
|
|
|
size: fi.Size(),
|
|
|
|
|
}
|
|
|
|
|
probeCacheMu.Lock()
|
|
|
|
|
probeCache[key] = probeCacheEntry{
|
|
|
|
|
probe: probe,
|
|
|
|
|
expires: time.Now().Add(probeCacheTTL),
|
|
|
|
|
}
|
|
|
|
|
probeCacheMu.Unlock()
|
2026-05-27 11:15:44 +02:00
|
|
|
// Lazy janitor — fires once per process. No-op after first call.
|
|
|
|
|
startProbeCacheJanitor()
|
2026-05-27 10:09:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ResetProbeCache clears the in-memory probe cache. Test-only.
|
|
|
|
|
func ResetProbeCache() {
|
|
|
|
|
probeCacheMu.Lock()
|
|
|
|
|
probeCache = make(map[probeCacheKey]probeCacheEntry)
|
|
|
|
|
probeCacheMu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ProbeCacheSize returns the number of entries currently cached. Exposed
|
|
|
|
|
// for diagnostics + tests.
|
|
|
|
|
func ProbeCacheSize() int {
|
|
|
|
|
probeCacheMu.RLock()
|
|
|
|
|
defer probeCacheMu.RUnlock()
|
|
|
|
|
return len(probeCache)
|
|
|
|
|
}
|