diff --git a/server/ffprobe/ffprobe.go b/server/ffprobe/ffprobe.go index 72ecdf78a..bc7476482 100644 --- a/server/ffprobe/ffprobe.go +++ b/server/ffprobe/ffprobe.go @@ -33,20 +33,19 @@ func Exists() bool { } func ProbeUrl(link string) (*ffprobe.ProbeData, error) { - data, err := ffprobe.ProbeURL(getCtx(), link) + data, err := ProbeUrlWithTimeout(link, 5*time.Minute) return data, err } -func ProbeReader(reader io.Reader) (*ffprobe.ProbeData, error) { - data, err := ffprobe.ProbeReader(getCtx(), reader) - return data, err +func ProbeUrlWithTimeout(link string, timeout time.Duration) (*ffprobe.ProbeData, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return ffprobe.ProbeURL(ctx, link) } -func getCtx() context.Context { - ctx, cancel := context.WithCancel(context.Background()) - go func() { - time.Sleep(5 * time.Minute) - cancel() - }() - return ctx +func ProbeReader(reader io.Reader) (*ffprobe.ProbeData, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + data, err := ffprobe.ProbeReader(ctx, reader) + return data, err } diff --git a/server/go.mod b/server/go.mod index e1d3a2b3d..d350cecd8 100644 --- a/server/go.mod +++ b/server/go.mod @@ -17,6 +17,7 @@ require ( github.com/anacrolix/publicip v0.3.1 github.com/anacrolix/torrent v1.59.1 github.com/dustin/go-humanize v1.0.1 + github.com/ebitengine/purego v0.10.1 github.com/gin-contrib/cors v1.7.6 github.com/gin-contrib/location/v2 v2.0.0 github.com/gin-gonic/gin v1.11.0 @@ -32,6 +33,7 @@ require ( golang.org/x/exp v0.0.0-20260527015227-08cc5374adb3 golang.org/x/image v0.33.0 golang.org/x/net v0.55.0 + golang.org/x/sys v0.45.0 golang.org/x/time v0.15.0 gopkg.in/telebot.v4 v4.0.0-beta.7 gopkg.in/vansante/go-ffprobe.v2 v2.2.1 @@ -199,7 +201,6 @@ require ( golang.org/x/crypto v0.51.0 // indirect golang.org/x/mod v0.36.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.45.0 // indirect golang.org/x/telemetry v0.0.0-20260508192327-42602be52be6 // indirect golang.org/x/text v0.37.0 // indirect golang.org/x/tools v0.45.0 // indirect diff --git a/server/go.sum b/server/go.sum index 1899df7c4..83d751701 100644 --- a/server/go.sum +++ b/server/go.sum @@ -225,6 +225,8 @@ github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+m github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/ebitengine/purego v0.10.1 h1:dewVBCBT2GaMu1SrNTYxQhgQBethzfhiwvZiLGP/qyY= +github.com/ebitengine/purego v0.10.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/edsrzf/mmap-go v1.2.0 h1:hXLYlkbaPzt1SaQk+anYwKSRNhufIDCchSPkUD6dD84= github.com/edsrzf/mmap-go v1.2.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= diff --git a/server/gstreamer/config.go b/server/gstreamer/config.go new file mode 100644 index 000000000..9338d3a7f --- /dev/null +++ b/server/gstreamer/config.go @@ -0,0 +1,197 @@ +package gstreamer + +import ( + "encoding/json" + "runtime" + "strings" + "time" + + "server/settings" +) + +type Config struct { + GSTVersion float64 + GSTPath string + Source string + + InactiveMinutes int + + AACBitrateKbps int + SegmentSeconds int + + TranscodeH264 bool + TranscodeH265 bool + TranscodeAV1 bool + TranscodeVP9 bool + VideoBitrate int + + PipelineTimeSeconds int + PipelineAudioQueue int + PipelineVideoQueue int + + TempFS bool + TempFSRing int +} + +func DefaultConfig() Config { + conf := Config{ + GSTVersion: 1.26, + Source: "stream", + InactiveMinutes: 5, + AACBitrateKbps: 256, + SegmentSeconds: 6, + VideoBitrate: 10_000, + PipelineTimeSeconds: 20, + PipelineAudioQueue: 4, + PipelineVideoQueue: 32, + TempFS: true, + } + + if runtime.GOOS == "windows" { + conf.GSTVersion = 1.28 + conf.GSTPath = `C:\Program Files\gstreamer\1.0\mingw_x86_64` + } + + return applySettingsConfig(conf).normalized() +} + +func (c Config) normalized() Config { + if c.InactiveMinutes <= 0 { + c.InactiveMinutes = 5 + } + if c.AACBitrateKbps <= 0 { + c.AACBitrateKbps = 256 + } + if c.SegmentSeconds <= 0 { + c.SegmentSeconds = 6 + } + if c.VideoBitrate <= 0 { + c.VideoBitrate = 10_000 + } + if c.PipelineTimeSeconds <= 0 { + c.PipelineTimeSeconds = 20 + } + if c.PipelineAudioQueue <= 0 { + c.PipelineAudioQueue = 4 + } + if c.PipelineVideoQueue <= 0 { + c.PipelineVideoQueue = 32 + } + if c.TempFSRing < 0 { + c.TempFSRing = 0 + } + if c.GSTVersion <= 0 { + c.GSTVersion = 1.26 + } + c.Source = strings.ToLower(strings.TrimSpace(c.Source)) + if c.Source != "play" { + c.Source = "stream" + } + return c +} + +func (c Config) inactiveDuration() time.Duration { + return time.Duration(c.normalized().InactiveMinutes) * time.Minute +} + +type storedConfig struct { + GSTVersion *float64 + GSTPath *string + Source *string + + InactiveMinutes *int + + AACBitrateKbps *int + SegmentSeconds *int + + TranscodeH264 *bool + TranscodeH265 *bool + TranscodeAV1 *bool + TranscodeVP9 *bool + VideoBitrate *int + + PipelineTimeSeconds *int + PipelineAudioQueue *int + PipelineVideoQueue *int + + TempFS *bool `json:"tempfs"` + TempFSRing *int `json:"tempfs_ring"` +} + +func applySettingsConfig(conf Config) Config { + if settings.Path == "" { + return conf + } + + db := settings.NewJsonDB() + if db == nil { + return conf + } + + var data []byte + for _, name := range []string{"gst", "GStreamer"} { + data = db.Get("Settings", name) + if len(data) > 0 { + break + } + } + if len(data) == 0 { + return conf + } + + var stored storedConfig + if err := json.Unmarshal(data, &stored); err != nil { + return conf + } + + if stored.GSTVersion != nil { + conf.GSTVersion = *stored.GSTVersion + } + if stored.GSTPath != nil { + conf.GSTPath = *stored.GSTPath + } + if stored.Source != nil { + conf.Source = *stored.Source + } + if stored.InactiveMinutes != nil { + conf.InactiveMinutes = *stored.InactiveMinutes + } + if stored.AACBitrateKbps != nil { + conf.AACBitrateKbps = *stored.AACBitrateKbps + } + if stored.SegmentSeconds != nil { + conf.SegmentSeconds = *stored.SegmentSeconds + } + if stored.TranscodeH264 != nil { + conf.TranscodeH264 = *stored.TranscodeH264 + } + if stored.TranscodeH265 != nil { + conf.TranscodeH265 = *stored.TranscodeH265 + } + if stored.TranscodeAV1 != nil { + conf.TranscodeAV1 = *stored.TranscodeAV1 + } + if stored.TranscodeVP9 != nil { + conf.TranscodeVP9 = *stored.TranscodeVP9 + } + if stored.VideoBitrate != nil { + conf.VideoBitrate = *stored.VideoBitrate + } + if stored.PipelineTimeSeconds != nil { + conf.PipelineTimeSeconds = *stored.PipelineTimeSeconds + } + if stored.PipelineAudioQueue != nil { + conf.PipelineAudioQueue = *stored.PipelineAudioQueue + } + if stored.PipelineVideoQueue != nil { + conf.PipelineVideoQueue = *stored.PipelineVideoQueue + } + if stored.TempFS != nil { + conf.TempFS = *stored.TempFS + } + if stored.TempFSRing != nil { + conf.TempFSRing = *stored.TempFSRing + } + + return conf +} diff --git a/server/gstreamer/echo.go b/server/gstreamer/echo.go new file mode 100644 index 000000000..c3a5dce87 --- /dev/null +++ b/server/gstreamer/echo.go @@ -0,0 +1,86 @@ +package gstreamer + +import ( + "context" + "net/http" + "os" + "os/exec" + "path/filepath" + "time" + + "github.com/gin-gonic/gin" +) + +type echoResponse struct { + FFProbe componentStatus `json:"ffprobe"` + GStreamer componentStatus `json:"gstreamer"` +} + +type componentStatus struct { + Found bool `json:"found"` + Available bool `json:"available"` + Works bool `json:"works"` +} + +func (s *Service) echo(c *gin.Context) { + c.JSON(http.StatusOK, echoResponse{ + FFProbe: checkFFProbe(), + GStreamer: checkGStreamer(s.conf), + }) +} + +func checkFFProbe() componentStatus { + var status componentStatus + + path, ok := findFFProbeBinary() + if !ok { + return status + } + status.Found = true + + info, err := os.Stat(path) + if err != nil || info.IsDir() { + return status + } + status.Available = true + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := exec.CommandContext(ctx, path, "-version").Run(); err == nil { + status.Works = true + } + return status +} + +func findFFProbeBinary() (string, bool) { + if path, err := exec.LookPath("ffprobe"); err == nil { + return path, true + } + + dirs := []string{"."} + if exe, err := os.Executable(); err == nil { + dirs = append(dirs, filepath.Dir(exe)) + } + + seen := make(map[string]struct{}, len(dirs)) + for _, dir := range dirs { + absDir, err := filepath.Abs(dir) + if err != nil { + continue + } + if _, ok := seen[absDir]; ok { + continue + } + seen[absDir] = struct{}{} + + for _, name := range []string{"ffprobe", "ffprobe.exe"} { + path := filepath.Join(absDir, name) + if _, err := os.Stat(path); err == nil { + return path, true + } + } + } + + return "", false +} diff --git a/server/gstreamer/echo_gst.go b/server/gstreamer/echo_gst.go new file mode 100644 index 000000000..4dbd53159 --- /dev/null +++ b/server/gstreamer/echo_gst.go @@ -0,0 +1,48 @@ +//go:build (windows && (amd64 || arm64)) || (linux && (amd64 || arm64)) || (darwin && (amd64 || arm64)) + +package gstreamer + +import "time" + +func checkGStreamer(conf Config) componentStatus { + gstInitOnce.Do(func() { + initGStreamerRuntime(conf) + }) + + status := gstInitStatus + if !status.Available || gstRuntime == nil { + return status + } + + if checkGStreamerPipeline(gstRuntime) == nil { + status.Works = true + } + return status +} + +func checkGStreamerPipeline(api *gstAPI) error { + pipeline, err := api.parseLaunch("fakesrc num-buffers=1 ! fakesink") + if err != nil { + return err + } + defer api.objectUnref(pipeline) + + bus := api.pipelineGetBus(pipeline) + if bus != 0 { + defer api.objectUnref(bus) + } + + if ret := api.elementSetState(pipeline, gstStatePlaying); ret == gstStateChangeFailure { + _ = api.elementSetState(pipeline, gstStateNull) + return api.popBusError(bus, 0) + } + if ret := api.elementGetState(pipeline, 5*time.Second); ret == gstStateChangeFailure { + _ = api.elementSetState(pipeline, gstStateNull) + return api.popBusError(bus, 0) + } + + if ret := api.elementSetState(pipeline, gstStateNull); ret == gstStateChangeFailure { + return api.popBusError(bus, 0) + } + return nil +} diff --git a/server/gstreamer/echo_gst_stub.go b/server/gstreamer/echo_gst_stub.go new file mode 100644 index 000000000..1d48d6983 --- /dev/null +++ b/server/gstreamer/echo_gst_stub.go @@ -0,0 +1,7 @@ +//go:build !(windows && (amd64 || arm64)) && !(linux && (amd64 || arm64)) && !(darwin && (amd64 || arm64)) + +package gstreamer + +func checkGStreamer(_ Config) componentStatus { + return componentStatus{} +} diff --git a/server/gstreamer/gst_api.go b/server/gstreamer/gst_api.go new file mode 100644 index 000000000..89a98df1a --- /dev/null +++ b/server/gstreamer/gst_api.go @@ -0,0 +1,291 @@ +//go:build (windows && (amd64 || arm64)) || (linux && (amd64 || arm64)) || (darwin && (amd64 || arm64)) + +package gstreamer + +import ( + "errors" + "fmt" + "time" + "unsafe" + + "github.com/ebitengine/purego" +) + +const ( + gstStateNull int32 = 1 + gstStatePaused int32 = 3 + gstStatePlaying int32 = 4 + + gstStateChangeFailure int32 = 0 + + gstFormatTime int32 = 3 + + gstSeekFlagFlush int32 = 1 + gstSeekFlagKeyUnit int32 = 4 + gstSeekFlagSnapAfter int32 = 64 + + gstMapRead int32 = 1 + + gstMessageError int32 = 1 << 1 +) + +type gstAPI struct { + handles []uintptr + + gstInitCheck func(argc unsafe.Pointer, argv unsafe.Pointer, err unsafe.Pointer) int32 + gstParseLaunch func(description string, err unsafe.Pointer) uintptr + gstBinGetByName func(bin uintptr, name string) uintptr + gstObjectUnref func(obj uintptr) + gstMiniObjectUnref func(obj uintptr) + gstElementSetState func(element uintptr, state int32) int32 + gstElementGetState func(element uintptr, state unsafe.Pointer, pending unsafe.Pointer, timeout uint64) int32 + gstElementSeekSimple func(element uintptr, format int32, flags int32, position int64) int32 + gstPipelineGetBus func(pipeline uintptr) uintptr + gstBusTimedPopFiltered func(bus uintptr, timeout uint64, types int32) uintptr + gstMessageParseError func(msg uintptr, err unsafe.Pointer, debug unsafe.Pointer) + gstSampleGetBuffer func(sample uintptr) uintptr + gstSampleUnref func(sample uintptr) + gstBufferGetSize func(buffer uintptr) uintptr + gstBufferMap func(buffer uintptr, mapInfo unsafe.Pointer, flags int32) int32 + gstBufferUnmap func(buffer uintptr, mapInfo unsafe.Pointer) + + gstAppSinkTryPullSample func(sink uintptr, timeout uint64) uintptr + gstAppSinkIsEOS func(sink uintptr) int32 + + gErrorFree func(err uintptr) + gFree func(ptr uintptr) +} + +func (g *gstAPI) bind(gstHandle uintptr, gstAppHandle uintptr, glibHandle uintptr) (err error) { + defer func() { + if recovered := recover(); recovered != nil { + err = fmt.Errorf("gstreamer symbol bind failed: %v", recovered) + } + }() + + purego.RegisterLibFunc(&g.gstInitCheck, gstHandle, "gst_init_check") + purego.RegisterLibFunc(&g.gstParseLaunch, gstHandle, "gst_parse_launch") + purego.RegisterLibFunc(&g.gstBinGetByName, gstHandle, "gst_bin_get_by_name") + purego.RegisterLibFunc(&g.gstObjectUnref, gstHandle, "gst_object_unref") + purego.RegisterLibFunc(&g.gstMiniObjectUnref, gstHandle, "gst_mini_object_unref") + purego.RegisterLibFunc(&g.gstElementSetState, gstHandle, "gst_element_set_state") + purego.RegisterLibFunc(&g.gstElementGetState, gstHandle, "gst_element_get_state") + purego.RegisterLibFunc(&g.gstElementSeekSimple, gstHandle, "gst_element_seek_simple") + purego.RegisterLibFunc(&g.gstPipelineGetBus, gstHandle, "gst_pipeline_get_bus") + purego.RegisterLibFunc(&g.gstBusTimedPopFiltered, gstHandle, "gst_bus_timed_pop_filtered") + purego.RegisterLibFunc(&g.gstMessageParseError, gstHandle, "gst_message_parse_error") + purego.RegisterLibFunc(&g.gstSampleGetBuffer, gstHandle, "gst_sample_get_buffer") + purego.RegisterLibFunc(&g.gstSampleUnref, gstHandle, "gst_sample_unref") + purego.RegisterLibFunc(&g.gstBufferGetSize, gstHandle, "gst_buffer_get_size") + purego.RegisterLibFunc(&g.gstBufferMap, gstHandle, "gst_buffer_map") + purego.RegisterLibFunc(&g.gstBufferUnmap, gstHandle, "gst_buffer_unmap") + + purego.RegisterLibFunc(&g.gstAppSinkTryPullSample, gstAppHandle, "gst_app_sink_try_pull_sample") + purego.RegisterLibFunc(&g.gstAppSinkIsEOS, gstAppHandle, "gst_app_sink_is_eos") + + purego.RegisterLibFunc(&g.gErrorFree, glibHandle, "g_error_free") + purego.RegisterLibFunc(&g.gFree, glibHandle, "g_free") + + return nil +} + +func (g *gstAPI) init() error { + var errPtr uintptr + if g.gstInitCheck(nil, nil, unsafe.Pointer(&errPtr)) == 0 { + msg := g.takeGError(errPtr) + if msg == "" { + msg = "gst_init_check failed" + } + return errors.New(msg) + } + return nil +} + +func (g *gstAPI) parseLaunch(description string) (uintptr, error) { + var errPtr uintptr + pipeline := g.gstParseLaunch(description, unsafe.Pointer(&errPtr)) + if pipeline == 0 { + msg := g.takeGError(errPtr) + if msg == "" { + msg = "gst_parse_launch failed" + } + return 0, errors.New(msg) + } + if errPtr != 0 { + g.gErrorFree(errPtr) + } + return pipeline, nil +} + +func (g *gstAPI) binGetByName(bin uintptr, name string) uintptr { + if bin == 0 { + return 0 + } + return g.gstBinGetByName(bin, name) +} + +func (g *gstAPI) objectUnref(obj uintptr) { + if obj != 0 { + g.gstObjectUnref(obj) + } +} + +func (g *gstAPI) miniObjectUnref(obj uintptr) { + if obj != 0 { + g.gstMiniObjectUnref(obj) + } +} + +func (g *gstAPI) elementSetState(element uintptr, state int32) int32 { + if element == 0 { + return gstStateChangeFailure + } + return g.gstElementSetState(element, state) +} + +func (g *gstAPI) elementGetState(element uintptr, timeout time.Duration) int32 { + if element == 0 { + return gstStateChangeFailure + } + return g.gstElementGetState(element, nil, nil, uint64(timeout)) +} + +func (g *gstAPI) elementSeekSimple(element uintptr, format int32, flags int32, position int64) bool { + if element == 0 { + return false + } + return g.gstElementSeekSimple(element, format, flags, position) != 0 +} + +func (g *gstAPI) pipelineGetBus(pipeline uintptr) uintptr { + if pipeline == 0 { + return 0 + } + return g.gstPipelineGetBus(pipeline) +} + +func (g *gstAPI) appSinkTryPullSample(sink uintptr, timeout uint64) uintptr { + if sink == 0 { + return 0 + } + return g.gstAppSinkTryPullSample(sink, timeout) +} + +func (g *gstAPI) appSinkIsEOS(sink uintptr) bool { + return sink != 0 && g.gstAppSinkIsEOS(sink) != 0 +} + +func (g *gstAPI) sampleUnref(sample uintptr) { + if sample != 0 { + g.gstSampleUnref(sample) + } +} + +func (g *gstAPI) sampleBytes(sample uintptr) []byte { + if sample == 0 { + return nil + } + + buffer := g.gstSampleGetBuffer(sample) + if buffer == 0 || g.gstBufferGetSize(buffer) == 0 { + return nil + } + + var mapInfo [128]byte + if g.gstBufferMap(buffer, unsafe.Pointer(&mapInfo[0]), gstMapRead) == 0 { + return nil + } + defer g.gstBufferUnmap(buffer, unsafe.Pointer(&mapInfo[0])) + + dataPtr, size := gstMapInfoData(&mapInfo) + if dataPtr == 0 || size == 0 { + return nil + } + + data := unsafe.Slice((*byte)(unsafe.Pointer(dataPtr)), size) + return cloneBytes(data) +} + +func (g *gstAPI) popBusError(bus uintptr, timeout time.Duration) error { + if bus == 0 { + return nil + } + + msg := g.gstBusTimedPopFiltered(bus, uint64(timeout), gstMessageError) + if msg == 0 { + return nil + } + + defer g.miniObjectUnref(msg) + message := g.parseMessageError(msg) + if message == "" { + message = "gstreamer bus error" + } + return errors.New(message) +} + +func (g *gstAPI) parseMessageError(msg uintptr) string { + var errPtr uintptr + var debugPtr uintptr + g.gstMessageParseError(msg, unsafe.Pointer(&errPtr), unsafe.Pointer(&debugPtr)) + + message := g.takeGError(errPtr) + if debug := cString(debugPtr); debug != "" { + if message != "" { + message += ": " + debug + } else { + message = debug + } + } + if debugPtr != 0 { + g.gFree(debugPtr) + } + return message +} + +func (g *gstAPI) takeGError(errPtr uintptr) string { + if errPtr == 0 { + return "" + } + messagePtr := *(*uintptr)(unsafe.Pointer(errPtr + 8)) + message := cString(messagePtr) + g.gErrorFree(errPtr) + return message +} + +func gstMapInfoData(mapInfo *[128]byte) (uintptr, int) { + ptrSize := unsafe.Sizeof(uintptr(0)) + dataOffset := alignTo(uintptr(ptrSize)+4, uintptr(ptrSize)) + sizeOffset := dataOffset + uintptr(ptrSize) + + dataPtr := *(*uintptr)(unsafe.Pointer(uintptr(unsafe.Pointer(&mapInfo[0])) + dataOffset)) + size := *(*uintptr)(unsafe.Pointer(uintptr(unsafe.Pointer(&mapInfo[0])) + sizeOffset)) + return dataPtr, int(size) +} + +func alignTo(value uintptr, alignment uintptr) uintptr { + if alignment == 0 { + return value + } + remainder := value % alignment + if remainder == 0 { + return value + } + return value + alignment - remainder +} + +func cString(ptr uintptr) string { + if ptr == 0 { + return "" + } + + var out []byte + for offset := uintptr(0); ; offset++ { + b := *(*byte)(unsafe.Pointer(ptr + offset)) + if b == 0 { + break + } + out = append(out, b) + } + return string(out) +} diff --git a/server/gstreamer/gst_library_darwin.go b/server/gstreamer/gst_library_darwin.go new file mode 100644 index 000000000..edda547a2 --- /dev/null +++ b/server/gstreamer/gst_library_darwin.go @@ -0,0 +1,63 @@ +//go:build darwin && (amd64 || arm64) + +package gstreamer + +import ( + "fmt" + "path/filepath" + + "github.com/ebitengine/purego" +) + +func loadGST(conf Config) (*gstAPI, error) { + glibHandle, err := loadDarwinLibrary(conf, "libglib-2.0.0.dylib", "libglib-2.0.dylib") + if err != nil { + return nil, err + } + gstHandle, err := loadDarwinLibrary(conf, "libgstreamer-1.0.0.dylib", "libgstreamer-1.0.dylib") + if err != nil { + return nil, err + } + gstAppHandle, err := loadDarwinLibrary(conf, "libgstapp-1.0.0.dylib", "libgstapp-1.0.dylib") + if err != nil { + return nil, err + } + + api := &gstAPI{ + handles: []uintptr{glibHandle, gstHandle, gstAppHandle}, + } + if err := api.bind(gstHandle, gstAppHandle, glibHandle); err != nil { + return nil, err + } + return api, nil +} + +func loadDarwinLibrary(conf Config, names ...string) (uintptr, error) { + var lastErr error + for _, name := range names { + for _, candidate := range darwinLibraryCandidates(conf, name) { + handle, err := purego.Dlopen(candidate, purego.RTLD_NOW|purego.RTLD_GLOBAL) + if err == nil { + return handle, nil + } + lastErr = err + } + + handle, err := purego.Dlopen(name, purego.RTLD_NOW|purego.RTLD_GLOBAL) + if err == nil { + return handle, nil + } + lastErr = err + } + + return 0, fmt.Errorf("load %s: %w", names[0], lastErr) +} + +func darwinLibraryCandidates(conf Config, name string) []string { + roots := gstRuntimeRoots(conf) + var candidates []string + for _, root := range roots { + candidates = append(candidates, filepath.Join(root, "lib", name)) + } + return candidates +} diff --git a/server/gstreamer/gst_library_linux.go b/server/gstreamer/gst_library_linux.go new file mode 100644 index 000000000..cae65ea84 --- /dev/null +++ b/server/gstreamer/gst_library_linux.go @@ -0,0 +1,61 @@ +//go:build linux && (amd64 || arm64) + +package gstreamer + +import ( + "fmt" + "path/filepath" + + "github.com/ebitengine/purego" +) + +func loadGST(conf Config) (*gstAPI, error) { + glibHandle, err := loadLinuxLibrary(conf, "libglib-2.0.so.0") + if err != nil { + return nil, err + } + gstHandle, err := loadLinuxLibrary(conf, "libgstreamer-1.0.so.0") + if err != nil { + return nil, err + } + gstAppHandle, err := loadLinuxLibrary(conf, "libgstapp-1.0.so.0") + if err != nil { + return nil, err + } + + api := &gstAPI{ + handles: []uintptr{glibHandle, gstHandle, gstAppHandle}, + } + if err := api.bind(gstHandle, gstAppHandle, glibHandle); err != nil { + return nil, err + } + return api, nil +} + +func loadLinuxLibrary(conf Config, name string) (uintptr, error) { + for _, candidate := range linuxLibraryCandidates(conf, name) { + handle, err := purego.Dlopen(candidate, purego.RTLD_NOW|purego.RTLD_GLOBAL) + if err == nil { + return handle, nil + } + } + + handle, err := purego.Dlopen(name, purego.RTLD_NOW|purego.RTLD_GLOBAL) + if err != nil { + return 0, fmt.Errorf("load %s: %w", name, err) + } + return handle, nil +} + +func linuxLibraryCandidates(conf Config, name string) []string { + if conf.GSTPath == "" { + return nil + } + + return []string{ + filepath.Join(conf.GSTPath, "lib", name), + filepath.Join(conf.GSTPath, "lib64", name), + filepath.Join(conf.GSTPath, "lib", "x86_64-linux-gnu", name), + filepath.Join(conf.GSTPath, "lib", "aarch64-linux-gnu", name), + } +} diff --git a/server/gstreamer/gst_library_windows.go b/server/gstreamer/gst_library_windows.go new file mode 100644 index 000000000..7317fcc09 --- /dev/null +++ b/server/gstreamer/gst_library_windows.go @@ -0,0 +1,59 @@ +//go:build windows && (amd64 || arm64) + +package gstreamer + +import ( + "fmt" + "os" + "path/filepath" + + "golang.org/x/sys/windows" +) + +func loadGST(conf Config) (*gstAPI, error) { + if conf.GSTPath != "" { + gstBin := filepath.Join(conf.GSTPath, "bin") + if _, statErr := os.Stat(gstBin); statErr == nil { + if err := windows.SetDllDirectory(gstBin); err != nil { + return nil, fmt.Errorf("set gstreamer dll directory %q: %w", gstBin, err) + } + } + } + + glibHandle, err := loadWindowsLibrary(conf, "libglib-2.0-0.dll") + if err != nil { + return nil, err + } + gstHandle, err := loadWindowsLibrary(conf, "libgstreamer-1.0-0.dll") + if err != nil { + return nil, err + } + gstAppHandle, err := loadWindowsLibrary(conf, "libgstapp-1.0-0.dll") + if err != nil { + return nil, err + } + + api := &gstAPI{ + handles: []uintptr{uintptr(glibHandle), uintptr(gstHandle), uintptr(gstAppHandle)}, + } + if err := api.bind(uintptr(gstHandle), uintptr(gstAppHandle), uintptr(glibHandle)); err != nil { + return nil, err + } + return api, nil +} + +func loadWindowsLibrary(conf Config, name string) (windows.Handle, error) { + if conf.GSTPath != "" { + fullPath := filepath.Join(conf.GSTPath, "bin", name) + handle, err := windows.LoadLibraryEx(fullPath, 0, windows.LOAD_WITH_ALTERED_SEARCH_PATH) + if err == nil { + return handle, nil + } + } + + handle, err := windows.LoadLibrary(name) + if err != nil { + return 0, fmt.Errorf("load %s: %w", name, err) + } + return handle, nil +} diff --git a/server/gstreamer/handlers.go b/server/gstreamer/handlers.go new file mode 100644 index 000000000..b77781ec4 --- /dev/null +++ b/server/gstreamer/handlers.go @@ -0,0 +1,304 @@ +package gstreamer + +import ( + "context" + "errors" + "io" + "net/http" + "strconv" + "strings" + + "github.com/gin-gonic/gin" +) + +func (s *Service) SetupRoute(route gin.IRouter) { + route.GET("/gst/remove", s.remove) + route.GET("/gst/echo", s.echo) + route.GET("/gst/:hash/heartbeat", s.heartbeat) + route.GET("/gst/:hash/master.m3u8", s.master) + route.GET("/gst/:hash/init.mp4", s.initMP4) + route.GET("/gst/:hash/seg/*segment", s.segment) +} + +func (s *Service) remove(c *gin.Context) { + id := firstNonEmpty(c.Query("hash"), c.Query("id")) + if id == "" { + c.AbortWithError(http.StatusBadRequest, ErrInvalidIdentifier) + return + } + + if !s.TryRemove(id) { + c.Status(http.StatusNotFound) + return + } + + c.JSON(http.StatusOK, gin.H{"success": true}) +} + +func (s *Service) heartbeat(c *gin.Context) { + if s.Get(c.Param("hash")) == nil { + c.Status(http.StatusNotFound) + return + } + + c.Status(http.StatusOK) +} + +func (s *Service) master(c *gin.Context) { + noCache(c) + + hash := c.Param("hash") + fileID := firstNonEmpty(c.Query("index"), c.Query("id"), c.Query("fileID")) + audio := parseQueryInt(c, "audio", 0) + + task, err := s.GetOrAdd(hash, fileID, audio) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + c.String(http.StatusGatewayTimeout, err.Error()) + return + } + c.String(http.StatusBadGateway, err.Error()) + return + } + + duration := task.Probe.DurationSeconds() + if duration <= 0 { + duration = 200 * 60 + } + + segmentSeconds := task.Config.SegmentSeconds + count := duration / segmentSeconds + startIndex := startSegmentIndex(parseQueryInt(c, "seconds", 0), segmentSeconds, count) + startSeconds := startIndex * segmentSeconds + + var playlist strings.Builder + playlist.WriteString("#EXTM3U\n") + playlist.WriteString("#EXT-X-PLAYLIST-TYPE:VOD\n") + playlist.WriteString("#EXT-X-VERSION:7\n") + playlist.WriteString("#EXT-X-TARGETDURATION:") + playlist.WriteString(strconv.Itoa(segmentSeconds)) + playlist.WriteByte('\n') + playlist.WriteString("#EXT-X-MEDIA-SEQUENCE:") + playlist.WriteString(strconv.Itoa(startIndex)) + playlist.WriteByte('\n') + playlist.WriteString("#EXT-X-MAP:URI=\"init.mp4?audio=") + playlist.WriteString(strconv.Itoa(audio)) + if startSeconds > 0 { + playlist.WriteString("&seconds=") + playlist.WriteString(strconv.Itoa(startSeconds)) + } + playlist.WriteString("\"\n") + + for i := startIndex; i < count; i++ { + playlist.WriteString("#EXTINF:") + playlist.WriteString(strconv.Itoa(segmentSeconds)) + playlist.WriteString(".00,\n") + playlist.WriteString("seg/") + playlist.WriteString(strconv.Itoa(i)) + playlist.WriteString(".m4s\n") + } + + playlist.WriteString("#EXT-X-ENDLIST\n") + + c.Data(http.StatusOK, "application/vnd.apple.mpegurl; charset=utf-8", []byte(playlist.String())) +} + +func (s *Service) initMP4(c *gin.Context) { + noCache(c) + + task := s.Get(c.Param("hash")) + if task == nil { + c.Status(http.StatusNotFound) + return + } + + audio := parseQueryInt(c, "audio", task.Audio) + startIndex := startSegmentIndex(parseQueryInt(c, "seconds", 0), task.Config.SegmentSeconds, task.Probe.DurationSeconds()/task.Config.SegmentSeconds) + if err := task.EnsureInit(c.Request.Context(), audio, startIndex); err != nil { + c.AbortWithError(http.StatusBadGateway, err) + return + } + + init := task.InitMP4() + if len(init) == 0 { + c.Status(http.StatusBadGateway) + return + } + + c.Header("Content-Length", strconv.Itoa(len(init))) + c.Data(http.StatusOK, "video/mp4", init) +} + +func (s *Service) segment(c *gin.Context) { + noCache(c) + + task := s.Get(c.Param("hash")) + if task == nil { + c.Status(http.StatusNotFound) + return + } + if len(task.InitMP4()) == 0 { + c.Status(http.StatusBadGateway) + return + } + + index, err := parseSegmentIndex(c.Param("segment")) + if err != nil { + c.AbortWithError(http.StatusBadRequest, err) + return + } + + audio := parseQueryInt(c, "audio", task.Audio) + seg, err := task.Segment(c.Request.Context(), index, audio) + if err != nil { + c.AbortWithError(http.StatusBadGateway, err) + return + } + if len(seg.Data) == 0 { + c.Status(http.StatusBadGateway) + return + } + + writeSegment(c, seg) +} + +func writeSegment(c *gin.Context, seg Segment) { + totalLength := int64(seg.Len()) + + c.Header("Content-Type", "video/mp4") + c.Header("Accept-Ranges", "bytes") + + rangeHeader := c.GetHeader("Range") + if rangeHeader == "" { + c.Header("Content-Length", strconv.FormatInt(totalLength, 10)) + _, _ = c.Writer.Write(seg.Data) + return + } + + start, end, ok := parseSingleRange(rangeHeader, totalLength) + if !ok { + c.Header("Content-Range", "bytes */"+strconv.FormatInt(totalLength, 10)) + c.Status(http.StatusRequestedRangeNotSatisfiable) + return + } + + length := end - start + 1 + c.Status(http.StatusPartialContent) + c.Header("Content-Range", "bytes "+strconv.FormatInt(start, 10)+"-"+strconv.FormatInt(end, 10)+"/"+strconv.FormatInt(totalLength, 10)) + c.Header("Content-Length", strconv.FormatInt(length, 10)) + + _ = copyRange(c.Writer, seg.Data, start, length) +} + +func copyRange(dst io.Writer, data []byte, offset int64, count int64) error { + if count <= 0 || offset < 0 || offset >= int64(len(data)) { + return nil + } + + end := offset + count + if end > int64(len(data)) { + end = int64(len(data)) + } + + _, err := dst.Write(data[offset:end]) + return err +} + +func parseSingleRange(header string, totalLength int64) (int64, int64, bool) { + const prefix = "bytes=" + if totalLength <= 0 || !strings.HasPrefix(header, prefix) { + return 0, 0, false + } + + spec := strings.TrimSpace(strings.TrimPrefix(header, prefix)) + if spec == "" || strings.Contains(spec, ",") { + return 0, 0, false + } + + left, right, ok := strings.Cut(spec, "-") + if !ok { + return 0, 0, false + } + + var start int64 + var end int64 + + if left != "" { + parsedStart, err := strconv.ParseInt(left, 10, 64) + if err != nil { + return 0, 0, false + } + start = parsedStart + + if right == "" { + end = totalLength - 1 + } else { + parsedEnd, err := strconv.ParseInt(right, 10, 64) + if err != nil { + return 0, 0, false + } + end = parsedEnd + } + } else { + suffixLength, err := strconv.ParseInt(right, 10, 64) + if err != nil || suffixLength <= 0 { + return 0, 0, false + } + if suffixLength > totalLength { + suffixLength = totalLength + } + start = totalLength - suffixLength + end = totalLength - 1 + } + + if start < 0 || end < start || start >= totalLength { + return 0, 0, false + } + if end >= totalLength { + end = totalLength - 1 + } + + return start, end, true +} + +func parseSegmentIndex(value string) (int, error) { + value = strings.TrimPrefix(value, "/") + value = strings.TrimSuffix(value, ".m4s") + if value == "" || strings.Contains(value, "/") { + return 0, errors.New("invalid segment index") + } + return strconv.Atoi(value) +} + +func parseQueryInt(c *gin.Context, key string, fallback int) int { + value := c.Query(key) + if value == "" { + return fallback + } + parsed, err := strconv.Atoi(value) + if err != nil { + return fallback + } + return parsed +} + +func startSegmentIndex(seconds int, segmentSeconds int, count int) int { + if seconds <= 0 || segmentSeconds <= 0 { + return 0 + } + + index := seconds / segmentSeconds + if index < 0 { + return 0 + } + if count > 0 && index > count { + return count + } + return index +} + +func noCache(c *gin.Context) { + c.Header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0") + c.Header("Pragma", "no-cache") + c.Header("Expires", "0") +} diff --git a/server/gstreamer/init.go b/server/gstreamer/init.go new file mode 100644 index 000000000..b842b266e --- /dev/null +++ b/server/gstreamer/init.go @@ -0,0 +1,49 @@ +package gstreamer + +import ( + "sync" + + "github.com/gin-gonic/gin" +) + +var ( + defaultServiceMu sync.Mutex + defaultService *Service +) + +func SetupRoute(route gin.IRouter) { + getDefaultService().SetupRoute(route) +} + +func Stop() { + defaultServiceMu.Lock() + service := defaultService + defaultService = nil + defaultServiceMu.Unlock() + + if service != nil { + service.Dispose() + } +} + +func Remove(id string) bool { + defaultServiceMu.Lock() + service := defaultService + defaultServiceMu.Unlock() + + if service == nil { + return false + } + return service.TryRemove(id) +} + +func getDefaultService() *Service { + defaultServiceMu.Lock() + defer defaultServiceMu.Unlock() + + if defaultService == nil { + defaultService = NewService(DefaultConfig()) + } + + return defaultService +} diff --git a/server/gstreamer/mp4box.go b/server/gstreamer/mp4box.go new file mode 100644 index 000000000..ef44a1aa3 --- /dev/null +++ b/server/gstreamer/mp4box.go @@ -0,0 +1,1584 @@ +package gstreamer + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "math" + "math/bits" +) + +const audioBoundaryToleranceSeconds = 0.100 + +type boxTarget int + +const ( + boxTargetNone boxTarget = iota + boxTargetInit + boxTargetMoof + boxTargetPayload + boxTargetStyp + boxTargetPrefix +) + +const ( + boxStyp = uint32('s')<<24 | uint32('t')<<16 | uint32('y')<<8 | uint32('p') + boxSidx = uint32('s')<<24 | uint32('i')<<16 | uint32('d')<<8 | uint32('x') + boxEmsg = uint32('e')<<24 | uint32('m')<<16 | uint32('s')<<8 | uint32('g') + boxFree = uint32('f')<<24 | uint32('r')<<16 | uint32('e')<<8 | uint32('e') + boxPrft = uint32('p')<<24 | uint32('r')<<16 | uint32('f')<<8 | uint32('t') + boxMoov = uint32('m')<<24 | uint32('o')<<16 | uint32('o')<<8 | uint32('v') + boxMoof = uint32('m')<<24 | uint32('o')<<16 | uint32('o')<<8 | uint32('f') + boxMdat = uint32('m')<<24 | uint32('d')<<16 | uint32('a')<<8 | uint32('t') + boxMfhd = uint32('m')<<24 | uint32('f')<<16 | uint32('h')<<8 | uint32('d') + boxTraf = uint32('t')<<24 | uint32('r')<<16 | uint32('a')<<8 | uint32('f') + boxTfhd = uint32('t')<<24 | uint32('f')<<16 | uint32('h')<<8 | uint32('d') + boxTfdt = uint32('t')<<24 | uint32('f')<<16 | uint32('d')<<8 | uint32('t') + boxTrun = uint32('t')<<24 | uint32('r')<<16 | uint32('u')<<8 | uint32('n') + boxTrak = uint32('t')<<24 | uint32('r')<<16 | uint32('a')<<8 | uint32('k') + boxTkhd = uint32('t')<<24 | uint32('k')<<16 | uint32('h')<<8 | uint32('d') + boxMdia = uint32('m')<<24 | uint32('d')<<16 | uint32('i')<<8 | uint32('a') + boxMdhd = uint32('m')<<24 | uint32('d')<<16 | uint32('h')<<8 | uint32('d') + boxHdlr = uint32('h')<<24 | uint32('d')<<16 | uint32('l')<<8 | uint32('r') + boxMvex = uint32('m')<<24 | uint32('v')<<16 | uint32('e')<<8 | uint32('x') + boxTrex = uint32('t')<<24 | uint32('r')<<16 | uint32('e')<<8 | uint32('x') + + handlerVideo = uint32('v')<<24 | uint32('i')<<16 | uint32('d')<<8 | uint32('e') + handlerAudio = uint32('s')<<24 | uint32('o')<<16 | uint32('u')<<8 | uint32('n') + + tfhdBaseDataOffsetPresent uint32 = 0x000001 + tfhdSampleDescriptionIndexPresent uint32 = 0x000002 + tfhdDefaultSampleDurationPresent uint32 = 0x000008 + tfhdDefaultSampleSizePresent uint32 = 0x000010 + tfhdDefaultSampleFlagsPresent uint32 = 0x000020 + tfhdDefaultBaseIsMoof uint32 = 0x020000 + trunDataOffsetPresent uint32 = 0x000001 + trunFirstSampleFlagsPresent uint32 = 0x000004 + trunSampleDurationPresent uint32 = 0x000100 + trunSampleSizePresent uint32 = 0x000200 + trunSampleFlagsPresent uint32 = 0x000400 + trunCompositionTimeOffsetPresent uint32 = 0x000800 +) + +type trexInfo struct { + duration uint32 + size uint32 + flags uint32 +} + +type trackInfo struct { + id uint32 + timescale uint32 + trex trexInfo +} + +type trexEntry struct { + trackID uint32 + value trexInfo +} + +type mp4Run struct { + box []byte + dataOffsetField int + sourceDataOffset int32 + hasSourceDataOffset bool + duration uint64 + dataSize uint64 + payloadOffset int64 + outputOffset int64 + startsWithSync bool +} + +type mp4Fragment struct { + trackID uint32 + timescale uint32 + decodeTime uint64 + duration uint64 + startsWithSync bool + tfhd []byte + runs []mp4Run + payload []byte +} + +func (f *mp4Fragment) endTime() uint64 { + return f.decodeTime + f.duration +} + +type mp4BoxReader struct { + onInit func([]byte) + onSegment func(Segment) + + segmentSeconds float64 + + init bytes.Buffer + sourceMoof bytes.Buffer + sourceStyp bytes.Buffer + deferred bytes.Buffer + + video []mp4Fragment + audio []mp4Fragment + + sourcePayload bytes.Buffer + prefix bytes.Buffer + prefixActive bool + + pending *mp4Fragment + styp []byte + + boxHeader [16]byte + boxHeaderLength int + boxHeaderRequired int + + currentBoxType uint32 + currentBoxRemaining uint64 + currentTarget boxTarget + + initDone bool + moovCompleted bool + sourcePayloadFromMoof int64 + + videoTrack trackInfo + audioTrack trackInfo + + tfdtOffsetSeconds float64 + sequence uint32 +} + +func Mp4BoxReader(onInit func([]byte), onSegment func(Segment), segmentSeconds float64) *mp4BoxReader { + reader := &mp4BoxReader{ + onInit: onInit, + onSegment: onSegment, + segmentSeconds: segmentSeconds, + boxHeaderRequired: 8, + sequence: 1, + } + if math.IsNaN(segmentSeconds) || math.IsInf(segmentSeconds, 0) || segmentSeconds <= 0 { + reader.segmentSeconds = 6 + } + reader.sourceMoof.Grow(16 * 1024) + reader.sourceStyp.Grow(128) + reader.deferred.Grow(64 * 1024) + return reader +} + +func (r *mp4BoxReader) SeekReset(seconds float64) { + r.initDone = false + r.moovCompleted = false + r.videoTrack = trackInfo{} + r.audioTrack = trackInfo{} + r.sequence = 1 + r.styp = nil + + if !math.IsNaN(seconds) && !math.IsInf(seconds, 0) && seconds > 0 { + r.tfdtOffsetSeconds = seconds + } else { + r.tfdtOffsetSeconds = 0 + } + + r.init.Reset() + r.sourceMoof.Reset() + r.sourceStyp.Reset() + r.deferred.Reset() + r.clearSource() + r.clearFragments() + r.resetPrefix() + r.resetBoxState() +} + +func (r *mp4BoxReader) Push(data []byte) error { + if len(data) == 0 { + return nil + } + + completed, err := r.TryProcessDeferred() + if err != nil { + return err + } + if completed { + _, _ = r.deferred.Write(data) + return nil + } + + consumed, segmentCompleted, err := r.processBytes(data) + if err != nil { + return err + } + if !segmentCompleted { + return nil + } + + if consumed < len(data) { + _, _ = r.deferred.Write(data[consumed:]) + } + return nil +} + +func (r *mp4BoxReader) TryProcessDeferred() (bool, error) { + completed, err := r.tryBuildSegment() + if err != nil { + return false, err + } + if completed { + return true, nil + } + + if r.deferred.Len() == 0 { + return false, nil + } + + data := r.deferred.Bytes() + length := len(data) + consumed, completed, err := r.processBytes(data) + if err != nil { + return false, err + } + + if completed { + r.keepDeferred(consumed) + return true, nil + } + + if consumed != length { + return false, fmt.Errorf("mp4 parser consumed %d of %d deferred bytes", consumed, length) + } + + r.deferred.Reset() + return false, nil +} + +func (r *mp4BoxReader) TryFlushFinalSegment() (bool, error) { + if len(r.video) == 0 || len(r.audio) == 0 { + return false, nil + } + if !r.video[0].startsWithSync { + return false, nil + } + + videoCount := len(r.video) + videoEnd := r.video[videoCount-1].endTime() + + audioCount, err := r.selectAudioCount(videoEnd) + if err != nil { + return false, err + } + if audioCount == 0 { + audioCount = len(r.audio) + } + if audioCount == 0 { + return false, nil + } + + if err := r.buildSegment(videoCount, audioCount); err != nil { + return false, err + } + return true, nil +} + +func (r *mp4BoxReader) TakePendingSegment() (Segment, bool) { + if r.pending == nil || r.sourcePayload.Len() == 0 { + return Segment{}, false + } + + payload := takeBuffer(&r.sourcePayload) + mdatHeaderSize := 8 + if uint64(len(payload))+8 > math.MaxUint32 { + mdatHeaderSize = 16 + } + + var segment bytes.Buffer + segment.Grow(len(r.styp) + r.prefix.Len() + r.sourceMoof.Len() + mdatHeaderSize + len(payload)) + + if r.styp != nil { + _, _ = segment.Write(r.styp) + } + if r.prefixActive && r.prefix.Len() > 0 { + _, _ = segment.Write(r.prefix.Bytes()) + } + _, _ = segment.Write(r.sourceMoof.Bytes()) + writeMdatHeader(&segment, uint64(len(payload)), mdatHeaderSize) + _, _ = segment.Write(payload) + + startSeconds := 0.0 + if r.pending.timescale > 0 { + startSeconds = float64(r.pending.decodeTime) / float64(r.pending.timescale) + } + + r.clearSource() + r.resetPrefix() + + return Segment{ + Data: takeBuffer(&segment), + StartSeconds: startSeconds, + }, true +} + +func (r *mp4BoxReader) processBytes(data []byte) (int, bool, error) { + position := 0 + + for position < len(data) { + if r.boxHeaderLength < r.boxHeaderRequired { + copyLen := minInt(r.boxHeaderRequired-r.boxHeaderLength, len(data)-position) + copy(r.boxHeader[r.boxHeaderLength:r.boxHeaderLength+copyLen], data[position:position+copyLen]) + + r.boxHeaderLength += copyLen + position += copyLen + + if r.boxHeaderLength < r.boxHeaderRequired { + break + } + + if r.boxHeaderRequired == 8 { + size32 := binary.BigEndian.Uint32(r.boxHeader[0:4]) + r.currentBoxType = binary.BigEndian.Uint32(r.boxHeader[4:8]) + + if size32 == 1 { + r.boxHeaderRequired = 16 + continue + } + + if size32 == 0 { + return position, false, errors.New("top-level mp4 box size=0 is not supported") + } + + if err := r.beginBox(uint64(size32), 8); err != nil { + return position, false, err + } + } else { + size64 := binary.BigEndian.Uint64(r.boxHeader[8:16]) + if err := r.beginBox(size64, 16); err != nil { + return position, false, err + } + } + + if r.currentBoxRemaining == 0 { + completed, err := r.completeBox() + if err != nil { + return position, false, err + } + r.resetBoxState() + + if completed { + return position, true, nil + } + } + + continue + } + + bodySize := minInt(len(data)-position, int(minUint64(uint64(len(data)-position), r.currentBoxRemaining))) + if bodySize <= 0 { + break + } + + r.writeCurrentBoxData(data[position : position+bodySize]) + + position += bodySize + r.currentBoxRemaining -= uint64(bodySize) + + if r.currentBoxRemaining == 0 { + completed, err := r.completeBox() + if err != nil { + return position, false, err + } + r.resetBoxState() + + if completed { + return position, true, nil + } + } + } + + return position, false, nil +} + +func (r *mp4BoxReader) beginBox(size uint64, headerSize int) error { + if size < uint64(headerSize) { + return errors.New("invalid mp4 box size") + } + if (r.currentBoxType == boxMoof || r.currentBoxType == boxMdat) && size > math.MaxInt32 { + return errors.New("moof/mdat is too large") + } + + r.currentBoxRemaining = size - uint64(headerSize) + r.currentTarget = boxTargetNone + + if !r.initDone && (r.currentBoxType == boxStyp || r.currentBoxType == boxMoof) { + if err := r.completeInit(); err != nil { + return err + } + } + + if !r.initDone { + if r.currentBoxType == boxMdat { + return errors.New("mdat appeared before init was completed") + } + + r.currentTarget = boxTargetInit + r.writeCurrentBoxData(r.boxHeader[:headerSize]) + return nil + } + + switch r.currentBoxType { + case boxMoof: + if r.pending != nil { + return errors.New("a new moof appeared before the previous mdat") + } + + r.sourceMoof.Reset() + r.sourcePayloadFromMoof = 0 + r.currentTarget = boxTargetMoof + r.writeCurrentBoxData(r.boxHeader[:headerSize]) + return nil + + case boxMdat: + if r.pending == nil { + return errors.New("mdat does not follow a supported moof") + } + + r.sourcePayload.Reset() + r.sourcePayloadFromMoof += int64(headerSize) + r.currentTarget = boxTargetPayload + return nil + + case boxSidx: + if r.pending != nil { + r.sourcePayloadFromMoof += int64(size) + } + return nil + + case boxStyp: + if r.pending != nil { + return errors.New("styp cannot appear between moof and mdat") + } + + r.sourceStyp.Reset() + r.currentTarget = boxTargetStyp + r.writeCurrentBoxData(r.boxHeader[:headerSize]) + return nil + + case boxEmsg, boxFree, boxPrft: + if r.pending != nil { + r.sourcePayloadFromMoof += int64(size) + } + + r.prefixActive = true + r.currentTarget = boxTargetPrefix + r.writeCurrentBoxData(r.boxHeader[:headerSize]) + return nil + } + + return fmt.Errorf("unsupported top-level mp4 box after init: %s", fourCC(r.currentBoxType)) +} + +func (r *mp4BoxReader) writeCurrentBoxData(data []byte) { + if len(data) == 0 { + return + } + + switch r.currentTarget { + case boxTargetInit: + _, _ = r.init.Write(data) + case boxTargetMoof: + _, _ = r.sourceMoof.Write(data) + case boxTargetPayload: + _, _ = r.sourcePayload.Write(data) + case boxTargetStyp: + _, _ = r.sourceStyp.Write(data) + case boxTargetPrefix: + _, _ = r.prefix.Write(data) + } +} + +func (r *mp4BoxReader) completeBox() (bool, error) { + switch r.currentBoxType { + case boxStyp: + if r.styp == nil && r.sourceStyp.Len() > 0 { + r.styp = cloneBytes(r.sourceStyp.Bytes()) + } + r.sourceStyp.Reset() + return false, nil + + case boxMoov: + if r.initDone { + return false, errors.New("unexpected moov after mp4 initialization") + } + r.moovCompleted = true + return false, nil + + case boxMoof: + return false, r.completeMoof() + + case boxMdat: + if err := r.completeMdat(); err != nil { + return false, err + } + completed, err := r.tryBuildSegment() + if err != nil { + return false, err + } + return completed, nil + } + + return false, nil +} + +func (r *mp4BoxReader) completeInit() error { + if !r.moovCompleted || r.init.Len() == 0 { + return errors.New("incomplete mp4 initialization") + } + + init := cloneBytes(r.init.Bytes()) + video, audio, err := parseInitTracks(init) + if err != nil { + return fmt.Errorf("unable to parse mp4 initialization: %w", err) + } + + r.videoTrack = video + r.audioTrack = audio + r.initDone = true + r.onInit(init) + return nil +} + +func (r *mp4BoxReader) completeMoof() error { + fragment, err := parseSourceMoof(r.sourceMoof.Bytes(), r.videoTrack, r.audioTrack) + if err != nil { + return fmt.Errorf("unable to parse source moof: %w", err) + } + + r.pending = fragment + r.sourcePayloadFromMoof = int64(r.sourceMoof.Len()) + return nil +} + +func (r *mp4BoxReader) completeMdat() error { + if r.pending == nil { + return errors.New("completed mdat has no source moof") + } + + payload := takeBuffer(&r.sourcePayload) + if err := attachPayload(r.pending, payload, r.sourcePayloadFromMoof); err != nil { + return err + } + + switch r.pending.trackID { + case r.videoTrack.id: + r.video = append(r.video, *r.pending) + case r.audioTrack.id: + r.audio = append(r.audio, *r.pending) + default: + return fmt.Errorf("unsupported track_ID=%d", r.pending.trackID) + } + + r.pending = nil + r.sourcePayloadFromMoof = 0 + r.sourceMoof.Reset() + return nil +} + +func attachPayload(fragment *mp4Fragment, payload []byte, payloadFromMoof int64) error { + var expected int64 + + for i := range fragment.runs { + run := &fragment.runs[i] + offset := expected + if run.hasSourceDataOffset { + offset = int64(run.sourceDataOffset) - payloadFromMoof + } + + if offset != expected { + return fmt.Errorf("non-contiguous source mdat: expected=%d, actual=%d", expected, offset) + } + if run.dataSize > uint64(math.MaxInt64) { + return errors.New("trun payload is too large") + } + + run.payloadOffset = offset + expected = offset + int64(run.dataSize) + } + + if expected != int64(len(payload)) { + return fmt.Errorf("source mdat size mismatch: trun=%d, mdat=%d", expected, len(payload)) + } + + fragment.payload = payload + return nil +} + +func (r *mp4BoxReader) tryBuildSegment() (bool, error) { + videoCount, err := r.selectVideoCount() + if err != nil || videoCount == 0 { + return false, err + } + + videoEnd := r.video[videoCount-1].endTime() + audioCount, err := r.selectAudioCount(videoEnd) + if err != nil || audioCount == 0 { + return false, err + } + + if err := r.buildSegment(videoCount, audioCount); err != nil { + return false, err + } + return true, nil +} + +func (r *mp4BoxReader) selectVideoCount() (int, error) { + if len(r.video) == 0 { + return 0, nil + } + + if !r.video[0].startsWithSync { + return 0, fmt.Errorf("video segment starts with a non-sync sample at %.6fs", float64(r.video[0].decodeTime)/float64(r.videoTrack.timescale)) + } + + target, err := toUnits(r.segmentSeconds, r.videoTrack.timescale) + if err != nil { + return 0, err + } + + var duration uint64 + for i := 0; i+1 < len(r.video); i++ { + duration += r.video[i].duration + if duration >= target && r.video[i+1].startsWithSync { + return i + 1, nil + } + } + + return 0, nil +} + +func (r *mp4BoxReader) selectAudioCount(videoEnd uint64) (int, error) { + if len(r.audio) == 0 { + return 0, nil + } + + tolerance, err := toUnits(audioBoundaryToleranceSeconds, r.audioTrack.timescale) + if err != nil { + return 0, err + } + + for i := range r.audio { + audioEnd := r.audio[i].endTime() + withTolerance := audioEnd + tolerance + if withTolerance < audioEnd { + withTolerance = math.MaxUint64 + } + + if scaledGreaterOrEqual(withTolerance, r.videoTrack.timescale, videoEnd, r.audioTrack.timescale) { + return i + 1, nil + } + } + + return 0, nil +} + +func (r *mp4BoxReader) buildSegment(videoCount int, audioCount int) error { + if err := validateTrack(r.video, videoCount); err != nil { + return err + } + if err := validateTrack(r.audio, audioCount); err != nil { + return err + } + + var payloadLength int64 + assignOffsets(r.video, videoCount, &payloadLength) + assignOffsets(r.audio, audioCount, &payloadLength) + + videoTrafSize := getTrafSize(r.video, videoCount) + audioTrafSize := getTrafSize(r.audio, audioCount) + moofSize64 := int64(8 + 16 + videoTrafSize + audioTrafSize) + if moofSize64 > math.MaxUint32 { + return errors.New("combined moof is too large") + } + + moofSize := uint32(moofSize64) + mdatHeaderSize := 8 + if uint64(payloadLength)+8 > math.MaxUint32 { + mdatHeaderSize = 16 + } + + var segment bytes.Buffer + segment.Grow(int(moofSize64) + mdatHeaderSize + int(payloadLength) + len(r.styp) + r.prefix.Len()) + + if r.styp != nil { + _, _ = segment.Write(r.styp) + } + if r.prefixActive && r.prefix.Len() > 0 { + _, _ = segment.Write(r.prefix.Bytes()) + } + + writeHeader(&segment, moofSize, boxMoof) + writeMfhd(&segment, r.sequence) + r.sequence++ + if err := r.writeTraf(&segment, r.video, videoCount, moofSize, mdatHeaderSize); err != nil { + return err + } + if err := r.writeTraf(&segment, r.audio, audioCount, moofSize, mdatHeaderSize); err != nil { + return err + } + writeMdatHeader(&segment, uint64(payloadLength), mdatHeaderSize) + appendPayloads(r.video, videoCount, &segment) + appendPayloads(r.audio, audioCount, &segment) + + startSeconds := float64(r.video[0].decodeTime) / float64(r.video[0].timescale) + r.onSegment(Segment{ + Data: takeBuffer(&segment), + StartSeconds: startSeconds, + }) + + r.video = removeFragments(r.video, videoCount) + r.audio = removeFragments(r.audio, audioCount) + r.resetPrefix() + return nil +} + +func validateTrack(fragments []mp4Fragment, count int) error { + first := fragments[0] + expected := first.endTime() + + for i := 1; i < count; i++ { + current := fragments[i] + if current.trackID != first.trackID || + current.timescale != first.timescale || + current.decodeTime != expected || + !bytes.Equal(current.tfhd, first.tfhd) { + return fmt.Errorf("track %d fragments cannot be merged into one traf", first.trackID) + } + + expected = current.endTime() + } + return nil +} + +func assignOffsets(fragments []mp4Fragment, count int, outputOffset *int64) { + for i := 0; i < count; i++ { + fragment := &fragments[i] + baseOffset := *outputOffset + + for j := range fragment.runs { + fragment.runs[j].outputOffset = baseOffset + fragment.runs[j].payloadOffset + } + + *outputOffset += int64(len(fragment.payload)) + } +} + +func getTrafSize(fragments []mp4Fragment, count int) int64 { + size := int64(8 + len(fragments[0].tfhd) + 20) + for i := 0; i < count; i++ { + for _, run := range fragments[i].runs { + size += int64(len(run.box)) + } + } + return size +} + +func (r *mp4BoxReader) writeTraf(output io.Writer, fragments []mp4Fragment, count int, moofSize uint32, mdatHeaderSize int) error { + size := getTrafSize(fragments, count) + if size > math.MaxUint32 { + return errors.New("combined traf is too large") + } + + first := fragments[0] + writeHeader(output, uint32(size), boxTraf) + _, _ = output.Write(first.tfhd) + writeTfdt(output, addTfdtOffset(first.decodeTime, first.timescale, r.tfdtOffsetSeconds)) + + for i := 0; i < count; i++ { + for j := range fragments[i].runs { + run := &fragments[i].runs[j] + dataOffset := int64(moofSize) + int64(mdatHeaderSize) + run.outputOffset + if dataOffset < math.MinInt32 || dataOffset > math.MaxInt32 { + return errors.New("trun.data_offset exceeds int32") + } + writePatchedTrun(output, run, int32(dataOffset)) + } + } + return nil +} + +func writePatchedTrun(output io.Writer, run *mp4Run, dataOffset int32) { + box := run.box + field := run.dataOffsetField + _, _ = output.Write(box[:field]) + + var value [4]byte + binary.BigEndian.PutUint32(value[:], uint32(dataOffset)) + _, _ = output.Write(value[:]) + _, _ = output.Write(box[field+4:]) +} + +func parseSourceMoof(moof []byte, videoTrack trackInfo, audioTrack trackInfo) (*mp4Fragment, error) { + rootPosition := 0 + rootType, moofHeader, moofBox, ok := tryReadBox(moof, &rootPosition) + if !ok || rootType != boxMoof || rootPosition != len(moof) { + return nil, errors.New("buffer does not contain exactly one moof") + } + + position := moofHeader + trafCount := 0 + var fragment *mp4Fragment + + for { + boxType, headerSize, box, ok := tryReadBox(moofBox, &position) + if !ok { + break + } + if boxType != boxTraf { + continue + } + + trafCount++ + if trafCount > 1 { + return nil, errors.New("source moof must contain one traf") + } + + parsed, err := parseTraf(box, headerSize, videoTrack, audioTrack) + if err != nil { + return nil, err + } + fragment = parsed + } + + if fragment == nil { + return nil, errors.New("traf was not found") + } + return fragment, nil +} + +func parseTraf(traf []byte, trafHeader int, videoTrack trackInfo, audioTrack trackInfo) (*mp4Fragment, error) { + var trackID uint32 + var defaultDuration uint32 + var defaultSize uint32 + var defaultFlags uint32 + var hasDefaultFlags bool + var tfhd []byte + var decodeTime uint64 + var hasTfdt bool + + position := trafHeader + for { + boxType, headerSize, box, ok := tryReadBox(traf, &position) + if !ok { + break + } + + switch boxType { + case boxTfhd: + if tfhd != nil { + return nil, errors.New("duplicate tfhd") + } + normalized, parsedTrackID, parsedDuration, parsedSize, parsedFlags, parsedHasFlags, err := normalizeTfhd(box, headerSize) + if err != nil { + return nil, err + } + tfhd = normalized + trackID = parsedTrackID + defaultDuration = parsedDuration + defaultSize = parsedSize + defaultFlags = parsedFlags + hasDefaultFlags = parsedHasFlags + + case boxTfdt: + if hasTfdt { + return nil, errors.New("duplicate tfdt") + } + value, ok := readTfdt(box, headerSize) + if !ok { + return nil, errors.New("invalid tfdt") + } + decodeTime = value + hasTfdt = true + + case boxTrun: + // Parsed after tfhd/trex defaults are known. + + default: + return nil, fmt.Errorf("unsupported box %s inside traf", fourCC(boxType)) + } + } + + if tfhd == nil || !hasTfdt { + return nil, errors.New("tfhd/tfdt was not found") + } + + var timescale uint32 + var trex trexInfo + switch trackID { + case videoTrack.id: + timescale = videoTrack.timescale + trex = videoTrack.trex + case audioTrack.id: + timescale = audioTrack.timescale + trex = audioTrack.trex + default: + return nil, fmt.Errorf("unsupported track_ID=%d", trackID) + } + + if timescale == 0 { + return nil, fmt.Errorf("timescale is zero for track_ID=%d", trackID) + } + + if defaultDuration == 0 { + defaultDuration = trex.duration + } + if defaultSize == 0 { + defaultSize = trex.size + } + if !hasDefaultFlags { + defaultFlags = trex.flags + } + + result := &mp4Fragment{ + trackID: trackID, + timescale: timescale, + decodeTime: decodeTime, + tfhd: tfhd, + } + + var duration uint64 + position = trafHeader + for { + boxType, headerSize, box, ok := tryReadBox(traf, &position) + if !ok { + break + } + if boxType != boxTrun { + continue + } + + run, err := normalizeTrun(box, headerSize, defaultDuration, defaultSize, defaultFlags) + if err != nil { + return nil, err + } + duration += run.duration + result.runs = append(result.runs, run) + } + + if len(result.runs) == 0 || duration == 0 { + return nil, errors.New("trun/duration was not found") + } + + result.duration = duration + result.startsWithSync = result.runs[0].startsWithSync + return result, nil +} + +func normalizeTfhd(box []byte, headerSize int) ([]byte, uint32, uint32, uint32, uint32, bool, error) { + if len(box) < headerSize+8 { + return nil, 0, 0, 0, 0, false, errors.New("tfhd is too small") + } + + versionFlags := binary.BigEndian.Uint32(box[headerSize : headerSize+4]) + version := byte(versionFlags >> 24) + flags := versionFlags & 0x00ffffff + if flags&tfhdBaseDataOffsetPresent != 0 { + return nil, 0, 0, 0, 0, false, errors.New("tfhd.base-data-offset-present is not supported") + } + + trackID := binary.BigEndian.Uint32(box[headerSize+4 : headerSize+8]) + optionalStart := headerSize + 8 + cursor := optionalStart + + if flags&tfhdSampleDescriptionIndexPresent != 0 && !skip(box, &cursor, 4) { + return nil, 0, 0, 0, 0, false, errors.New("invalid tfhd sample_description_index") + } + + var defaultDuration uint32 + var defaultSize uint32 + var defaultFlags uint32 + + if flags&tfhdDefaultSampleDurationPresent != 0 { + value, ok := readUint32(box, &cursor) + if !ok { + return nil, 0, 0, 0, 0, false, errors.New("invalid tfhd default_sample_duration") + } + defaultDuration = value + } + + if flags&tfhdDefaultSampleSizePresent != 0 { + value, ok := readUint32(box, &cursor) + if !ok { + return nil, 0, 0, 0, 0, false, errors.New("invalid tfhd default_sample_size") + } + defaultSize = value + } + + hasDefaultFlags := flags&tfhdDefaultSampleFlagsPresent != 0 + if hasDefaultFlags { + value, ok := readUint32(box, &cursor) + if !ok { + return nil, 0, 0, 0, 0, false, errors.New("invalid tfhd default_sample_flags") + } + defaultFlags = value + } + + if cursor != len(box) || trackID == 0 { + return nil, 0, 0, 0, 0, false, errors.New("invalid tfhd body") + } + + optionalLength := cursor - optionalStart + size := 16 + optionalLength + normalized := make([]byte, size) + binary.BigEndian.PutUint32(normalized[0:4], uint32(size)) + binary.BigEndian.PutUint32(normalized[4:8], boxTfhd) + binary.BigEndian.PutUint32(normalized[8:12], uint32(version)<<24|flags&^tfhdBaseDataOffsetPresent|tfhdDefaultBaseIsMoof) + binary.BigEndian.PutUint32(normalized[12:16], trackID) + copy(normalized[16:], box[optionalStart:cursor]) + + return normalized, trackID, defaultDuration, defaultSize, defaultFlags, hasDefaultFlags, nil +} + +func normalizeTrun(box []byte, headerSize int, defaultDuration uint32, defaultSize uint32, defaultFlags uint32) (mp4Run, error) { + var run mp4Run + if len(box) < headerSize+8 { + return run, errors.New("trun is too small") + } + + versionFlags := binary.BigEndian.Uint32(box[headerSize : headerSize+4]) + version := byte(versionFlags >> 24) + flags := versionFlags & 0x00ffffff + sampleCount := binary.BigEndian.Uint32(box[headerSize+4 : headerSize+8]) + + cursor := headerSize + 8 + if flags&trunDataOffsetPresent != 0 { + if len(box)-cursor < 4 { + return run, errors.New("invalid trun data_offset") + } + run.sourceDataOffset = int32(binary.BigEndian.Uint32(box[cursor : cursor+4])) + run.hasSourceDataOffset = true + cursor += 4 + } + + hasFirstSampleFlags := flags&trunFirstSampleFlagsPresent != 0 + firstSampleFlags := defaultFlags + if hasFirstSampleFlags { + value, ok := readUint32(box, &cursor) + if !ok { + return run, errors.New("invalid trun first_sample_flags") + } + firstSampleFlags = value + } + + hasDuration := flags&trunSampleDurationPresent != 0 + hasSize := flags&trunSampleSizePresent != 0 + if !hasDuration && defaultDuration == 0 { + return run, errors.New("sample duration is absent") + } + if !hasSize && defaultSize == 0 { + return run, errors.New("sample size is absent") + } + + var duration uint64 + var dataSize uint64 + for i := uint32(0); i < sampleCount; i++ { + sampleDuration := defaultDuration + sampleSize := defaultSize + + if hasDuration { + value, ok := readUint32(box, &cursor) + if !ok { + return run, errors.New("invalid trun sample_duration") + } + sampleDuration = value + } + + if hasSize { + value, ok := readUint32(box, &cursor) + if !ok { + return run, errors.New("invalid trun sample_size") + } + sampleSize = value + } + + duration += uint64(sampleDuration) + dataSize += uint64(sampleSize) + + if flags&trunSampleFlagsPresent != 0 { + sampleFlags, ok := readUint32(box, &cursor) + if !ok { + return run, errors.New("invalid trun sample_flags") + } + if i == 0 && !hasFirstSampleFlags { + firstSampleFlags = sampleFlags + } + } + + if flags&trunCompositionTimeOffsetPresent != 0 && !skip(box, &cursor, 4) { + return run, errors.New("invalid trun composition_time_offset") + } + } + + if cursor != len(box) || sampleCount == 0 || duration == 0 || dataSize == 0 { + return run, errors.New("invalid trun body") + } + + hadOffset := flags&trunDataOffsetPresent != 0 + bodyLength := len(box) - headerSize + normalizedSize := 8 + bodyLength + if !hadOffset { + normalizedSize += 4 + } + + normalized := make([]byte, normalizedSize) + binary.BigEndian.PutUint32(normalized[0:4], uint32(normalizedSize)) + binary.BigEndian.PutUint32(normalized[4:8], boxTrun) + binary.BigEndian.PutUint32(normalized[8:12], uint32(version)<<24|flags|trunDataOffsetPresent) + binary.BigEndian.PutUint32(normalized[12:16], sampleCount) + + if hadOffset { + copy(normalized[16:], box[headerSize+8:]) + } else { + binary.BigEndian.PutUint32(normalized[16:20], 0) + copy(normalized[20:], box[headerSize+8:]) + } + + run.box = normalized + run.dataOffsetField = 16 + run.duration = duration + run.dataSize = dataSize + run.startsWithSync = isSyncSample(firstSampleFlags) + return run, nil +} + +func isSyncSample(flags uint32) bool { + const nonSync = 0x00010000 + dependsOn := (flags >> 24) & 0x03 + return flags&nonSync == 0 && dependsOn != 1 +} + +func readTfdt(box []byte, headerSize int) (uint64, bool) { + if len(box) < headerSize+8 { + return 0, false + } + + version := box[headerSize] + offset := headerSize + 4 + switch version { + case 1: + if len(box) < offset+8 { + return 0, false + } + return binary.BigEndian.Uint64(box[offset : offset+8]), true + case 0: + if len(box) < offset+4 { + return 0, false + } + return uint64(binary.BigEndian.Uint32(box[offset : offset+4])), true + } + + return 0, false +} + +func parseInitTracks(init []byte) (trackInfo, trackInfo, error) { + moov, moovHeader, ok := findBox(init, boxMoov) + if !ok { + return trackInfo{}, trackInfo{}, errors.New("moov was not found") + } + + var videoID uint32 + var videoTimescale uint32 + var audioID uint32 + var audioTimescale uint32 + var trexEntries []trexEntry + + position := moovHeader + for { + boxType, header, box, ok := tryReadBox(moov, &position) + if !ok { + break + } + + if boxType == boxTrak { + trackID, timescale, handler := readTrack(box, header) + switch handler { + case handlerVideo: + if videoID != 0 { + return trackInfo{}, trackInfo{}, errors.New("multiple video tracks in mp4mux output") + } + videoID = trackID + videoTimescale = timescale + case handlerAudio: + if audioID != 0 { + return trackInfo{}, trackInfo{}, errors.New("multiple audio tracks in mp4mux output") + } + audioID = trackID + audioTimescale = timescale + } + continue + } + + if boxType != boxMvex { + continue + } + + mvexPosition := header + for { + childType, childHeader, child, ok := tryReadBox(box, &mvexPosition) + if !ok { + break + } + if childType != boxTrex { + continue + } + + entry, ok := readTrex(child, childHeader) + if !ok { + return trackInfo{}, trackInfo{}, errors.New("invalid trex") + } + trexEntries = append(trexEntries, entry) + } + } + + if videoID == 0 || videoTimescale == 0 { + return trackInfo{}, trackInfo{}, errors.New("video track was not found through hdlr=vide") + } + if audioID == 0 || audioTimescale == 0 { + return trackInfo{}, trackInfo{}, errors.New("audio track was not found through hdlr=soun") + } + + return trackInfo{ + id: videoID, + timescale: videoTimescale, + trex: findTrex(trexEntries, videoID), + }, trackInfo{ + id: audioID, + timescale: audioTimescale, + trex: findTrex(trexEntries, audioID), + }, nil +} + +func readTrack(trak []byte, trakHeader int) (uint32, uint32, uint32) { + var trackID uint32 + var timescale uint32 + var handler uint32 + + position := trakHeader + for { + boxType, header, box, ok := tryReadBox(trak, &position) + if !ok { + break + } + + if boxType == boxTkhd { + trackID = readTkhdTrackID(box, header) + continue + } + if boxType != boxMdia { + continue + } + + mdiaPosition := header + for { + mdiaType, mdiaHeader, child, ok := tryReadBox(box, &mdiaPosition) + if !ok { + break + } + switch mdiaType { + case boxMdhd: + timescale = readMdhdTimescale(child, mdiaHeader) + case boxHdlr: + handler = readHandlerType(child, mdiaHeader) + } + } + } + + return trackID, timescale, handler +} + +func readTkhdTrackID(box []byte, header int) uint32 { + if len(box) <= header { + return 0 + } + + offset := -1 + switch box[header] { + case 1: + offset = header + 20 + case 0: + offset = header + 12 + } + + if offset >= 0 && len(box) >= offset+4 { + return binary.BigEndian.Uint32(box[offset : offset+4]) + } + return 0 +} + +func readMdhdTimescale(box []byte, header int) uint32 { + if len(box) <= header { + return 0 + } + + offset := -1 + switch box[header] { + case 1: + offset = header + 20 + case 0: + offset = header + 12 + } + + if offset >= 0 && len(box) >= offset+4 { + return binary.BigEndian.Uint32(box[offset : offset+4]) + } + return 0 +} + +func readHandlerType(box []byte, header int) uint32 { + offset := header + 8 + if len(box) >= offset+4 { + return binary.BigEndian.Uint32(box[offset : offset+4]) + } + return 0 +} + +func readTrex(box []byte, header int) (trexEntry, bool) { + if len(box) < header+24 { + return trexEntry{}, false + } + + trackID := binary.BigEndian.Uint32(box[header+4 : header+8]) + if trackID == 0 { + return trexEntry{}, false + } + + return trexEntry{ + trackID: trackID, + value: trexInfo{ + duration: binary.BigEndian.Uint32(box[header+12 : header+16]), + size: binary.BigEndian.Uint32(box[header+16 : header+20]), + flags: binary.BigEndian.Uint32(box[header+20 : header+24]), + }, + }, true +} + +func findTrex(entries []trexEntry, trackID uint32) trexInfo { + for _, entry := range entries { + if entry.trackID == trackID { + return entry.value + } + } + return trexInfo{} +} + +func findBox(data []byte, requiredType uint32) ([]byte, int, bool) { + position := 0 + for position < len(data) { + start := position + boxType, header, _, ok := tryReadBox(data, &position) + if !ok { + return nil, 0, false + } + if boxType != requiredType { + continue + } + return data[start:position], header, true + } + return nil, 0, false +} + +func tryReadBox(data []byte, position *int) (uint32, int, []byte, bool) { + start := *position + if start < 0 || start > len(data) || len(data)-start < 8 { + return 0, 0, nil, false + } + + size32 := binary.BigEndian.Uint32(data[start : start+4]) + boxType := binary.BigEndian.Uint32(data[start+4 : start+8]) + size := uint64(size32) + headerSize := 8 + + if size32 == 1 { + if len(data)-start < 16 { + return 0, 0, nil, false + } + size = binary.BigEndian.Uint64(data[start+8 : start+16]) + headerSize = 16 + } else if size32 == 0 { + size = uint64(len(data) - start) + } + + if size < uint64(headerSize) || size > math.MaxInt32 || size > uint64(len(data)-start) { + return 0, 0, nil, false + } + + boxSize := int(size) + box := data[start : start+boxSize] + *position = start + boxSize + return boxType, headerSize, box, true +} + +func readUint32(data []byte, position *int) (uint32, bool) { + if *position < 0 || len(data)-*position < 4 { + return 0, false + } + value := binary.BigEndian.Uint32(data[*position : *position+4]) + *position += 4 + return value, true +} + +func skip(data []byte, position *int, count int) bool { + if count < 0 || *position < 0 || len(data)-*position < count { + return false + } + *position += count + return true +} + +func toUnits(seconds float64, timescale uint32) (uint64, error) { + value := seconds * float64(timescale) + if math.IsNaN(value) || math.IsInf(value, 0) || value < 0 || value > float64(math.MaxUint64) { + return 0, errors.New("invalid timeline value") + } + return uint64(math.Ceil(value)), nil +} + +func addTfdtOffset(value uint64, timescale uint32, seconds float64) uint64 { + if seconds <= 0 { + return value + } + + units := seconds * float64(timescale) + if math.IsNaN(units) || math.IsInf(units, 0) || units < 0 || units > float64(math.MaxUint64) { + panic("invalid tfdt offset") + } + + offset := uint64(math.Round(units)) + if math.MaxUint64-value < offset { + panic("tfdt offset overflow") + } + return value + offset +} + +func writeTfdt(output io.Writer, decodeTime uint64) { + var box [20]byte + binary.BigEndian.PutUint32(box[0:4], 20) + binary.BigEndian.PutUint32(box[4:8], boxTfdt) + binary.BigEndian.PutUint32(box[8:12], 0x01000000) + binary.BigEndian.PutUint64(box[12:20], decodeTime) + _, _ = output.Write(box[:]) +} + +func writeMfhd(output io.Writer, sequence uint32) { + var box [16]byte + binary.BigEndian.PutUint32(box[0:4], 16) + binary.BigEndian.PutUint32(box[4:8], boxMfhd) + binary.BigEndian.PutUint32(box[8:12], 0) + binary.BigEndian.PutUint32(box[12:16], sequence) + _, _ = output.Write(box[:]) +} + +func writeHeader(output io.Writer, size uint32, boxType uint32) { + var header [8]byte + binary.BigEndian.PutUint32(header[0:4], size) + binary.BigEndian.PutUint32(header[4:8], boxType) + _, _ = output.Write(header[:]) +} + +func writeMdatHeader(output io.Writer, payloadLength uint64, headerSize int) { + if headerSize == 8 { + writeHeader(output, uint32(payloadLength+8), boxMdat) + return + } + + var header [16]byte + binary.BigEndian.PutUint32(header[0:4], 1) + binary.BigEndian.PutUint32(header[4:8], boxMdat) + binary.BigEndian.PutUint64(header[8:16], payloadLength+16) + _, _ = output.Write(header[:]) +} + +func appendPayloads(fragments []mp4Fragment, count int, output io.Writer) { + for i := 0; i < count; i++ { + _, _ = output.Write(fragments[i].payload) + } +} + +func removeFragments(fragments []mp4Fragment, count int) []mp4Fragment { + if count <= 0 { + return fragments + } + copy(fragments, fragments[count:]) + for i := len(fragments) - count; i < len(fragments); i++ { + fragments[i] = mp4Fragment{} + } + return fragments[:len(fragments)-count] +} + +func (r *mp4BoxReader) clearFragments() { + for i := range r.video { + r.video[i] = mp4Fragment{} + } + for i := range r.audio { + r.audio[i] = mp4Fragment{} + } + r.video = r.video[:0] + r.audio = r.audio[:0] +} + +func (r *mp4BoxReader) resetPrefix() { + r.prefix.Reset() + r.prefixActive = false +} + +func (r *mp4BoxReader) clearSource() { + r.pending = nil + r.sourcePayload.Reset() + r.sourcePayloadFromMoof = 0 + r.sourceMoof.Reset() +} + +func (r *mp4BoxReader) resetBoxState() { + r.boxHeaderLength = 0 + r.boxHeaderRequired = 8 + r.currentBoxType = 0 + r.currentBoxRemaining = 0 + r.currentTarget = boxTargetNone +} + +func (r *mp4BoxReader) keepDeferred(consumed int) { + count := r.deferred.Len() - consumed + if count <= 0 { + r.deferred.Reset() + return + } + + data := r.deferred.Bytes() + copy(data, data[consumed:]) + r.deferred.Truncate(count) +} + +func scaledGreaterOrEqual(left uint64, leftScale uint32, right uint64, rightScale uint32) bool { + leftHi, leftLo := bits.Mul64(left, uint64(leftScale)) + rightHi, rightLo := bits.Mul64(right, uint64(rightScale)) + if leftHi != rightHi { + return leftHi > rightHi + } + return leftLo >= rightLo +} + +func fourCC(boxType uint32) string { + return string([]byte{ + byte(boxType >> 24), + byte(boxType >> 16), + byte(boxType >> 8), + byte(boxType), + }) +} + +func minInt(a int, b int) int { + if a < b { + return a + } + return b +} + +func minUint64(a uint64, b uint64) uint64 { + if a < b { + return a + } + return b +} diff --git a/server/gstreamer/pipeline_gst.go b/server/gstreamer/pipeline_gst.go new file mode 100644 index 000000000..ab8431b06 --- /dev/null +++ b/server/gstreamer/pipeline_gst.go @@ -0,0 +1,603 @@ +//go:build (windows && (amd64 || arm64)) || (linux && (amd64 || arm64)) || (darwin && (amd64 || arm64)) + +package gstreamer + +import ( + "context" + "errors" + "math" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +var ( + gstInitOnce sync.Once + gstRuntime *gstAPI + gstInitErr error + + gstInitStatus componentStatus +) + +type gstRunner struct { + task *Task + + audioIndex int + + statePlaying bool + readySegment struct { + index int + complete bool + segment Segment + } + + positionSeconds atomic.Uint64 + positionSeekSeconds float64 + + reader *mp4BoxReader + + pipeline uintptr + bus uintptr + sink uintptr + + frozen atomic.Bool +} + +func newPipelineRunner(task *Task, audio int) (pipelineRunner, error) { + gstInitOnce.Do(func() { + initGStreamerRuntime(task.Config) + }) + if gstInitErr != nil { + return nil, errors.Join(ErrPipelineDisabled, gstInitErr) + } + + runner := &gstRunner{ + task: task, + audioIndex: validAudioIndex(task.Probe, audio), + reader: Mp4BoxReader( + func(data []byte) { + task.setInitMP4(data) + }, + func(seg Segment) { + r := task.runner + if gr, ok := r.(*gstRunner); ok { + gr.readySegment.segment = seg + gr.readySegment.complete = true + if seg.StartSeconds >= 0 { + gr.setPosition(seg.StartSeconds + gr.positionSeekSeconds) + } + } + }, + float64(task.Config.SegmentSeconds), + ), + } + runner.readySegment.index = -1 + return runner, nil +} + +func initGStreamerRuntime(conf Config) { + setupGStreamer(conf) + + var err error + gstRuntime, err = loadGST(conf) + if err != nil { + gstInitErr = err + return + } + gstInitStatus.Found = true + + if err = gstRuntime.init(); err != nil { + gstInitErr = err + return + } + gstInitStatus.Available = true + gstInitErr = nil +} + +func setupGStreamer(conf Config) { + _ = os.Setenv("GST_REGISTRY", filepath.Join(os.TempDir(), "torrserver-gstreamer-registry.bin")) + + roots := gstRuntimeRoots(conf) + if len(roots) == 0 { + return + } + + for _, root := range roots { + gstBin := filepath.Join(root, "bin") + if _, err := os.Stat(gstBin); err == nil { + prependEnvPath("PATH", gstBin) + } + } + + var gstPlugins string + switch runtime.GOOS { + case "windows": + gstPlugins = filepath.Join(roots[0], "lib", "gstreamer-1.0") + case "linux", "darwin": + gstPlugins = firstExistingPath(gstPluginCandidates(roots)) + } + if gstPlugins != "" { + _ = os.Setenv("GST_PLUGIN_PATH", gstPlugins) + _ = os.Setenv("GST_PLUGIN_SYSTEM_PATH_1_0", gstPlugins) + } + + var gstPluginScanner string + switch runtime.GOOS { + case "windows": + gstPluginScanner = filepath.Join(roots[0], "libexec", "gstreamer-1.0", "gst-plugin-scanner.exe") + case "linux", "darwin": + gstPluginScanner = firstExistingPath(gstPluginScannerCandidates(roots)) + } + if gstPluginScanner != "" { + _ = os.Setenv("GST_PLUGIN_SCANNER", gstPluginScanner) + } +} + +func gstRuntimeRoots(conf Config) []string { + if conf.GSTPath != "" { + return []string{conf.GSTPath} + } + if runtime.GOOS != "darwin" { + return nil + } + return []string{ + "/Library/Frameworks/GStreamer.framework/Versions/1.0", + "/opt/homebrew", + "/usr/local", + } +} + +func gstPluginCandidates(roots []string) []string { + var candidates []string + for _, root := range roots { + candidates = append(candidates, + filepath.Join(root, "lib", "gstreamer-1.0"), + filepath.Join(root, "lib64", "gstreamer-1.0"), + filepath.Join(root, "lib", runtime.GOARCH+"-linux-gnu", "gstreamer-1.0"), + filepath.Join(root, "lib", "x86_64-linux-gnu", "gstreamer-1.0"), + filepath.Join(root, "lib", "aarch64-linux-gnu", "gstreamer-1.0"), + ) + } + return candidates +} + +func gstPluginScannerCandidates(roots []string) []string { + var candidates []string + for _, root := range roots { + candidates = append(candidates, + filepath.Join(root, "libexec", "gstreamer-1.0", "gst-plugin-scanner"), + filepath.Join(root, "lib", "gstreamer-1.0", "gst-plugin-scanner"), + filepath.Join(root, "lib64", "gstreamer-1.0", "gst-plugin-scanner"), + ) + } + return candidates +} + +func firstExistingPath(candidates []string) string { + for _, candidate := range candidates { + if _, err := os.Stat(candidate); err == nil { + return candidate + } + } + return "" +} + +func prependEnvPath(key string, value string) { + if value == "" { + return + } + + current := os.Getenv(key) + if current == "" { + _ = os.Setenv(key, value) + return + } + + separator := string(os.PathListSeparator) + for _, part := range strings.Split(current, separator) { + if strings.EqualFold(part, value) { + return + } + } + + _ = os.Setenv(key, value+separator+current) +} + +func (r *gstRunner) createPipelineArgs() string { + conf := r.task.Config + probe := r.task.Probe + + queueNS := int64(conf.PipelineTimeSeconds) * int64(time.Second) + audioQueueBytes := conf.PipelineAudioQueue * 1024 * 1024 + videoQueueBytes := conf.PipelineVideoQueue * 1024 * 1024 + + var sb strings.Builder + + sb.WriteString("souphttpsrc ") + sb.WriteString("location=\"") + sb.WriteString(r.task.SourceURL) + sb.WriteString("\" is-live=false keep-alive=true timeout=60 retries=5 ") + if conf.GSTVersion >= 1.26 { + sb.WriteString("retry-backoff-factor=0.5 retry-backoff-max=10 ") + } + r.writeSourceQueue(&sb, videoQueueBytes, queueNS) + sb.WriteString(" ! matroskademux name=d ") + + switch { + case probe.IsH264(): + if conf.TranscodeH264 { + r.transcodeToH264(&sb, videoQueueBytes, queueNS) + } else { + sb.WriteString("d.video_0 ! queue max-size-buffers=0 max-size-bytes=") + sb.WriteString(strconv.Itoa(videoQueueBytes)) + sb.WriteString(" max-size-time=") + sb.WriteString(strconv.FormatInt(queueNS, 10)) + sb.WriteString(" leaky=0 ! h264parse config-interval=-1 ! h264timestamper ! video/x-h264,stream-format=avc,alignment=au ! mux.video_0 ") + } + + case probe.IsH265(): + if conf.TranscodeH265 { + r.transcodeToH264(&sb, videoQueueBytes, queueNS) + } else { + sb.WriteString("d.video_0 ! queue max-size-buffers=0 max-size-bytes=") + sb.WriteString(strconv.Itoa(videoQueueBytes)) + sb.WriteString(" max-size-time=") + sb.WriteString(strconv.FormatInt(queueNS, 10)) + sb.WriteString(" leaky=0 ! h265parse config-interval=-1 ! h265timestamper ! video/x-h265,stream-format=hvc1,alignment=au ! mux.video_0 ") + } + + case probe.IsAV1(): + if conf.TranscodeAV1 { + r.transcodeToH264(&sb, videoQueueBytes, queueNS) + } else { + sb.WriteString("d.video_0 ! queue max-size-buffers=0 max-size-bytes=") + sb.WriteString(strconv.Itoa(videoQueueBytes)) + sb.WriteString(" max-size-time=") + sb.WriteString(strconv.FormatInt(queueNS, 10)) + sb.WriteString(" leaky=0 ! av1parse ! video/x-av1,stream-format=obu-stream,alignment=tu ! mux.video_0 ") + } + + case probe.IsVP9(): + if conf.TranscodeVP9 { + r.transcodeToH264(&sb, videoQueueBytes, queueNS) + } else { + sb.WriteString("d.video_0 ! queue max-size-buffers=0 max-size-bytes=") + sb.WriteString(strconv.Itoa(videoQueueBytes)) + sb.WriteString(" max-size-time=") + sb.WriteString(strconv.FormatInt(queueNS, 10)) + sb.WriteString(" leaky=0 ! vp9parse ! video/x-vp9,alignment=frame ! mux.video_0 ") + } + } + + sb.WriteString("d.audio_") + sb.WriteString(strconv.Itoa(r.audioIndex)) + sb.WriteString(" ! queue max-size-buffers=0 max-size-bytes=") + sb.WriteString(strconv.Itoa(audioQueueBytes)) + sb.WriteString(" max-size-time=") + sb.WriteString(strconv.FormatInt(queueNS, 10)) + sb.WriteString(" leaky=0 ! decodebin ! audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! audiorate skip-to-first=true tolerance=40000000 ! avenc_aac bitrate=") + sb.WriteString(strconv.Itoa(conf.AACBitrateKbps * 1000)) + sb.WriteString(" ! aacparse ! audio/mpeg,mpegversion=4,stream-format=raw,rate=48000,channels=2 ! mux.audio_0 ") + + sb.WriteString("mp4mux name=mux fragment-duration=") + sb.WriteString(strconv.Itoa(conf.SegmentSeconds * 1000)) + sb.WriteString(" streamable=true ! appsink name=out emit-signals=false sync=false max-buffers=1") + if conf.GSTVersion >= 1.28 { + sb.WriteString(" leaky-type=none") + } else { + sb.WriteString(" drop=false") + } + sb.WriteString(" wait-on-eos=false") + + return sb.String() +} + +func (r *gstRunner) writeSourceQueue(sb *strings.Builder, videoQueueBytes int, queueNS int64) { + conf := r.task.Config + + sb.WriteString("! queue2 use-buffering=false max-size-buffers=0 max-size-bytes=") + sb.WriteString(strconv.Itoa(videoQueueBytes)) + sb.WriteString(" max-size-time=") + sb.WriteString(strconv.FormatInt(queueNS, 10)) + + if !conf.TempFS { + return + } + + ringBlocks := 3 + conf.TempFSRing + ringBytes := ringBlocks * videoQueueBytes + template := gstPath(queue2TempTemplate()) + + sb.WriteString(" temp-template=\"") + sb.WriteString(template) + sb.WriteString("\" ring-buffer-max-size=") + sb.WriteString(strconv.Itoa(ringBytes)) +} + +func (r *gstRunner) transcodeToH264(sb *strings.Builder, maxQueueBytes int, queueNS int64) { + conf := r.task.Config + video := r.task.Probe.Video() + + frameRateNum := 0 + frameRateDen := 0 + if video != nil { + frameRateNum = video.FrameRateNum + frameRateDen = video.FrameRateDen + } + + keyIntMax := 25 * conf.SegmentSeconds + if frameRateNum > 0 && frameRateDen > 0 { + keyIntMax = maxInt(1, int(math.Round(float64(frameRateNum*conf.SegmentSeconds)/float64(frameRateDen)))) + } + + sb.WriteString("d.video_0 ! queue max-size-buffers=0 max-size-bytes=") + sb.WriteString(strconv.Itoa(maxQueueBytes)) + sb.WriteString(" max-size-time=") + sb.WriteString(strconv.FormatInt(queueNS, 10)) + sb.WriteString(" leaky=0 ! decodebin ! videoconvert ! video/x-raw,format=I420 ! x264enc tune=zerolatency speed-preset=veryfast bitrate=") + sb.WriteString(strconv.Itoa(conf.VideoBitrate)) + sb.WriteString(" key-int-max=") + sb.WriteString(strconv.Itoa(keyIntMax)) + sb.WriteString(" bframes=0 byte-stream=false ! video/x-h264,profile=main,stream-format=avc,alignment=au ! h264parse config-interval=-1 ! h264timestamper ! video/x-h264,profile=main,stream-format=avc,alignment=au ! mux.video_0 ") +} + +func (r *gstRunner) Seek(seconds float64) bool { + r.stopPipeline() + + r.reader.SeekReset(seconds) + r.readySegment.index = -1 + r.readySegment.complete = false + r.readySegment.segment = Segment{} + + if err := r.startPipeline(seconds); err != nil { + r.freezeAtPosition(seconds) + return false + } + + r.frozen.Store(false) + r.setPosition(seconds) + r.positionSeekSeconds = seconds + r.statePlaying = true + return true +} + +func (r *gstRunner) GetSegment(ctx context.Context, index int, audio int) (Segment, error) { + if r.IsFrozen() { + if !r.Seek(r.position()) { + return Segment{}, ErrSegmentNotReady + } + } else if !r.statePlaying { + r.statePlaying = true + r.audioIndex = validAudioIndex(r.task.Probe, audio) + startSeconds := 0.0 + if index > 0 { + startSeconds = float64(index * r.task.Config.SegmentSeconds) + r.reader.SeekReset(startSeconds) + r.positionSeekSeconds = startSeconds + r.setPosition(startSeconds) + } + if err := r.startPipeline(startSeconds); err != nil { + r.freezeAtPosition(startSeconds) + return Segment{}, err + } + } + + if r.readySegment.index == index && r.readySegment.complete { + return r.readySegment.segment, nil + } + + r.readySegment.index = -1 + r.readySegment.complete = false + r.readySegment.segment = Segment{} + + deadline := time.Now().Add(20 * time.Second) + for time.Now().Before(deadline) { + if ctx.Err() != nil { + return Segment{}, ctx.Err() + } + + sample := gstRuntime.appSinkTryPullSample(r.sink, uint64(100*time.Millisecond)) + if sample == 0 { + if gstRuntime.appSinkIsEOS(r.sink) { + completed, err := r.reader.TryProcessDeferred() + if err != nil { + r.freezeAtSegment(index) + return Segment{}, err + } + if completed && r.readySegment.complete { + return r.completeReadySegment(index), nil + } + + completed, err = r.reader.TryFlushFinalSegment() + if err != nil { + r.freezeAtSegment(index) + return Segment{}, err + } + if completed && r.readySegment.complete { + return r.completeReadySegment(index), nil + } + + if seg, ok := r.reader.TakePendingSegment(); ok { + r.readySegment.segment = seg + r.readySegment.complete = true + return r.completeReadySegment(index), nil + } + + r.freezeAtSegment(index) + return Segment{}, ErrSegmentNotReady + } + continue + } + + data := gstRuntime.sampleBytes(sample) + gstRuntime.sampleUnref(sample) + if len(data) == 0 { + continue + } + + if err := r.reader.Push(data); err != nil { + r.freezeAtSegment(index) + return Segment{}, err + } + + if r.readySegment.complete { + return r.completeReadySegment(index), nil + } + } + + return Segment{}, ErrSegmentNotReady +} + +func (r *gstRunner) completeReadySegment(index int) Segment { + if index > 0 { + r.readySegment.index = index + } else { + r.readySegment.index = 0 + } + return r.readySegment.segment +} + +func (r *gstRunner) freezeAtSegment(index int) { + seconds := r.position() + if index > 0 { + seconds = float64(index * r.task.Config.SegmentSeconds) + } + + r.freezeAtPosition(seconds) +} + +func (r *gstRunner) freezeAtPosition(seconds float64) { + r.stopPipeline() + r.reader.SeekReset(seconds) + r.readySegment.index = -1 + r.readySegment.complete = false + r.readySegment.segment = Segment{} + r.frozen.Store(true) + r.setPosition(seconds) + r.positionSeekSeconds = seconds + r.statePlaying = false +} + +func (r *gstRunner) startPipeline(seconds float64) error { + pipeline, err := gstRuntime.parseLaunch(r.createPipelineArgs()) + if err != nil { + return err + } + + sink := gstRuntime.binGetByName(pipeline, "out") + if sink == 0 { + gstRuntime.elementSetState(pipeline, gstStateNull) + gstRuntime.objectUnref(pipeline) + return errors.New("appsink element is not available") + } + + bus := gstRuntime.pipelineGetBus(pipeline) + + if seconds > 0 { + if err := r.setPipelineState(pipeline, bus, gstStatePaused); err != nil { + gstRuntime.elementSetState(pipeline, gstStateNull) + gstRuntime.objectUnref(sink) + gstRuntime.objectUnref(pipeline) + gstRuntime.objectUnref(bus) + return err + } + + if !gstRuntime.elementSeekSimple(pipeline, gstFormatTime, gstSeekFlagFlush|gstSeekFlagKeyUnit|gstSeekFlagSnapAfter, int64(math.Round(seconds*1_000_000_000))) { + gstRuntime.elementSetState(pipeline, gstStateNull) + gstRuntime.objectUnref(sink) + gstRuntime.objectUnref(pipeline) + gstRuntime.objectUnref(bus) + return errors.New("gstreamer seek failed") + } + } + + if err := r.setPipelineState(pipeline, bus, gstStatePlaying); err != nil { + gstRuntime.elementSetState(pipeline, gstStateNull) + gstRuntime.objectUnref(sink) + gstRuntime.objectUnref(pipeline) + gstRuntime.objectUnref(bus) + return err + } + + r.pipeline = pipeline + r.bus = bus + r.sink = sink + return nil +} + +func (r *gstRunner) setPipelineState(pipeline uintptr, bus uintptr, state int32) error { + if ret := gstRuntime.elementSetState(pipeline, state); ret == gstStateChangeFailure { + if err := gstRuntime.popBusError(bus, 0); err != nil { + return err + } + return errors.New("gstreamer state change failed") + } + + if ret := gstRuntime.elementGetState(pipeline, 5*time.Second); ret == gstStateChangeFailure { + if err := gstRuntime.popBusError(bus, 0); err != nil { + return err + } + return errors.New("gstreamer state wait failed") + } + + return nil +} + +func (r *gstRunner) stopPipeline() { + if r.pipeline != 0 { + _ = gstRuntime.elementSetState(r.pipeline, gstStateNull) + } + if r.sink != 0 { + gstRuntime.objectUnref(r.sink) + r.sink = 0 + } + if r.bus != 0 { + gstRuntime.objectUnref(r.bus) + r.bus = 0 + } + if r.pipeline != 0 { + gstRuntime.objectUnref(r.pipeline) + r.pipeline = 0 + } +} + +func (r *gstRunner) Dispose() { + r.stopPipeline() + if r.reader != nil { + r.reader.SeekReset(r.position()) + } + r.readySegment.index = -1 + r.readySegment.complete = false + r.readySegment.segment = Segment{} + r.statePlaying = false +} + +func (r *gstRunner) Frozen() { + r.freezeAtPosition(r.position()) +} + +func (r *gstRunner) IsFrozen() bool { + return r.frozen.Load() +} + +func (r *gstRunner) setPosition(seconds float64) { + r.positionSeconds.Store(math.Float64bits(seconds)) +} + +func (r *gstRunner) position() float64 { + return math.Float64frombits(r.positionSeconds.Load()) +} + +func validAudioIndex(probe ProbeInfo, requested int) int { + for _, track := range probe.Tracks { + if track.Type == "audio" && track.Index == requested { + return requested + } + } + return 0 +} diff --git a/server/gstreamer/pipeline_stub.go b/server/gstreamer/pipeline_stub.go new file mode 100644 index 000000000..bd20361e8 --- /dev/null +++ b/server/gstreamer/pipeline_stub.go @@ -0,0 +1,20 @@ +//go:build !(windows && (amd64 || arm64)) && !(linux && (amd64 || arm64)) && !(darwin && (amd64 || arm64)) + +package gstreamer + +import "context" + +type disabledRunner struct{} + +func newPipelineRunner(_ *Task, _ int) (pipelineRunner, error) { + return nil, ErrPipelineDisabled +} + +func (disabledRunner) GetSegment(context.Context, int, int) (Segment, error) { + return Segment{}, ErrPipelineDisabled +} + +func (disabledRunner) Seek(float64) bool { return false } +func (disabledRunner) Frozen() {} +func (disabledRunner) Dispose() {} +func (disabledRunner) IsFrozen() bool { return false } diff --git a/server/gstreamer/probe.go b/server/gstreamer/probe.go new file mode 100644 index 000000000..9b34cc463 --- /dev/null +++ b/server/gstreamer/probe.go @@ -0,0 +1,243 @@ +package gstreamer + +import ( + "fmt" + "math" + "strconv" + "strings" + "time" + + "server/ffprobe" + + probedata "gopkg.in/vansante/go-ffprobe.v2" +) + +const gstProbeTimeout = 30 * time.Second + +type ProbeInfo struct { + DurationNS int64 + Tracks []TrackInfo +} + +type TrackInfo struct { + Index int + PadName string + + Type string + CapsName string + + Title string + Language string + + Width int + Height int + Channels int + Rate int + + FrameRateNum int + FrameRateDen int +} + +func (p ProbeInfo) DurationSeconds() int { + if p.DurationNS <= 0 { + return 0 + } + return int(float64(p.DurationNS) / 1_000_000_000.0) +} + +func (p ProbeInfo) Video() *TrackInfo { + for i := range p.Tracks { + t := &p.Tracks[i] + if t.Type == "video" || + t.CapsName == "video/x-h264" || + t.CapsName == "video/x-h265" || + t.CapsName == "video/x-av1" || + t.CapsName == "video/x-vp9" || + t.CapsName == "video/x-vp8" { + return t + } + } + return nil +} + +func (p ProbeInfo) VideoCapsName() string { + if v := p.Video(); v != nil { + return v.CapsName + } + return "" +} + +func (p ProbeInfo) IsH264() bool { return p.VideoCapsName() == "video/x-h264" } +func (p ProbeInfo) IsH265() bool { return p.VideoCapsName() == "video/x-h265" } +func (p ProbeInfo) IsAV1() bool { return p.VideoCapsName() == "video/x-av1" } +func (p ProbeInfo) IsVP9() bool { return p.VideoCapsName() == "video/x-vp9" } +func (p ProbeInfo) IsVP8() bool { return p.VideoCapsName() == "video/x-vp8" } + +func probeSource(sourceURL string) (ProbeInfo, error) { + data, err := ffprobe.ProbeUrlWithTimeout(sourceURL, gstProbeTimeout) + if err != nil { + return ProbeInfo{}, err + } + return probeFromFFProbe(data), nil +} + +func probeFromFFProbe(data *probedata.ProbeData) ProbeInfo { + var probe ProbeInfo + if data == nil { + return probe + } + + if data.Format != nil && data.Format.DurationSeconds > 0 { + probe.DurationNS = int64(data.Format.DurationSeconds * 1_000_000_000) + } + + videoIndex := 0 + audioIndex := 0 + + for _, stream := range data.Streams { + if stream == nil { + continue + } + + switch stream.CodecType { + case "video": + track := TrackInfo{ + Index: videoIndex, + PadName: "video_" + strconv.Itoa(videoIndex), + Type: "video", + CapsName: codecToCapsName("video", stream.CodecName, stream.CodecLongName), + Width: stream.Width, + Height: stream.Height, + Title: tagString(stream.TagList, "title"), + Language: tagString(stream.TagList, "language"), + FrameRateNum: 0, + FrameRateDen: 0, + } + track.FrameRateNum, track.FrameRateDen = parseRate(firstNonEmpty(stream.AvgFrameRate, stream.RFrameRate)) + probe.Tracks = append(probe.Tracks, track) + videoIndex++ + + case "audio": + rate, _ := strconv.Atoi(stream.SampleRate) + track := TrackInfo{ + Index: audioIndex, + PadName: "audio_" + strconv.Itoa(audioIndex), + Type: "audio", + CapsName: codecToCapsName("audio", stream.CodecName, stream.CodecLongName), + Channels: stream.Channels, + Rate: rate, + Title: tagString(stream.TagList, "title"), + Language: tagString(stream.TagList, "language"), + } + probe.Tracks = append(probe.Tracks, track) + audioIndex++ + } + } + + return probe +} + +func codecToCapsName(kind string, values ...string) string { + codec := strings.ToLower(strings.Join(values, " ")) + if codec == "" { + return "" + } + + if kind == "video" { + switch { + case strings.Contains(codec, "h264") || strings.Contains(codec, "h.264") || strings.Contains(codec, "avc"): + return "video/x-h264" + case strings.Contains(codec, "hevc") || strings.Contains(codec, "h265") || strings.Contains(codec, "h.265"): + return "video/x-h265" + case strings.Contains(codec, "av1"): + return "video/x-av1" + case strings.Contains(codec, "vp9"): + return "video/x-vp9" + case strings.Contains(codec, "vp8"): + return "video/x-vp8" + } + } + + if kind == "audio" { + switch { + case strings.Contains(codec, "eac3") || strings.Contains(codec, "e-ac-3") || strings.Contains(codec, "e-ac3"): + return "audio/x-eac3" + case strings.Contains(codec, "ac3") || strings.Contains(codec, "ac-3") || strings.Contains(codec, "a/52"): + return "audio/x-ac3" + case strings.Contains(codec, "aac"): + return "audio/mpeg" + case strings.Contains(codec, "opus"): + return "audio/x-opus" + case strings.Contains(codec, "vorbis"): + return "audio/x-vorbis" + case strings.Contains(codec, "flac"): + return "audio/x-flac" + case strings.Contains(codec, "mpeg") || strings.Contains(codec, "mp3"): + return "audio/mpeg" + } + } + + return "" +} + +func parseRate(value string) (int, int) { + value = strings.TrimSpace(value) + if value == "" || value == "0/0" { + return 0, 0 + } + + parts := strings.SplitN(value, "/", 2) + num, err := strconv.Atoi(strings.TrimSpace(parts[0])) + if err != nil || num <= 0 { + return 0, 0 + } + + den := 1 + if len(parts) == 2 { + den, err = strconv.Atoi(strings.TrimSpace(parts[1])) + if err != nil || den <= 0 { + return 0, 0 + } + } + + if math.IsInf(float64(num)/float64(den), 0) { + return 0, 0 + } + + return num, den +} + +func tagString(tags probedata.Tags, key string) string { + if tags == nil { + return "" + } + if value, err := tags.GetString(key); err == nil { + return value + } + for k, raw := range tags { + if strings.EqualFold(k, key) { + return strings.TrimSpace(strings.Trim(rawString(raw), `"`)) + } + } + return "" +} + +func rawString(value interface{}) string { + switch v := value.(type) { + case string: + return v + case []byte: + return string(v) + default: + return fmt.Sprint(v) + } +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return value + } + } + return "" +} diff --git a/server/gstreamer/segment.go b/server/gstreamer/segment.go new file mode 100644 index 000000000..7cb31a910 --- /dev/null +++ b/server/gstreamer/segment.go @@ -0,0 +1,31 @@ +package gstreamer + +import "bytes" + +type Segment struct { + Data []byte + StartSeconds float64 +} + +func (s Segment) Len() int { + return len(s.Data) +} + +func cloneBytes(src []byte) []byte { + if len(src) == 0 { + return nil + } + dst := make([]byte, len(src)) + copy(dst, src) + return dst +} + +func takeBuffer(buf *bytes.Buffer) []byte { + if buf.Len() == 0 { + return nil + } + + data := buf.Bytes() + *buf = bytes.Buffer{} + return data +} diff --git a/server/gstreamer/service.go b/server/gstreamer/service.go new file mode 100644 index 000000000..2a5318bda --- /dev/null +++ b/server/gstreamer/service.go @@ -0,0 +1,218 @@ +package gstreamer + +import ( + "errors" + "net/url" + "sync" + "sync/atomic" + "time" + + "server/settings" +) + +var ( + ErrBadSource = errors.New("bad gstreamer source") + ErrUnsupportedVideo = errors.New("unsupported video codec") + ErrProbeUnavailable = errors.New("ffprobe returned no stream info") + ErrPipelineDisabled = errors.New("gstreamer support is not built in") + ErrSegmentNotReady = errors.New("segment is not ready") + ErrTaskNotFound = errors.New("gstreamer task not found") + ErrInvalidIdentifier = errors.New("invalid gstreamer task id") +) + +type Service struct { + conf Config + + mu sync.RWMutex + tasks map[string]*Task + + cleanupRunning atomic.Bool + stopCleanup chan struct{} +} + +func NewService(conf Config) *Service { + service := &Service{ + conf: conf.normalized(), + tasks: make(map[string]*Task), + stopCleanup: make(chan struct{}), + } + cleanupGSTTempFiles() + go service.cleanupLoop() + return service +} + +func (s *Service) GetOrAdd(hash string, fileID string, audio int) (*Task, error) { + if hash == "" || fileID == "" { + return nil, ErrBadSource + } + + sourceURL := sourceURL(s.conf, hash, fileID) + id := hash + + s.mu.RLock() + task := s.tasks[id] + s.mu.RUnlock() + + if task != nil && task.FileID == fileID && task.Audio == audio { + task.UpdateLastActive() + return task, nil + } + + probe, err := probeSource(sourceURL) + if err != nil { + return nil, err + } + if len(probe.Tracks) == 0 || probe.Video() == nil { + return nil, ErrProbeUnavailable + } + if !probe.IsH264() && !probe.IsH265() && !probe.IsAV1() && !probe.IsVP9() { + return nil, ErrUnsupportedVideo + } + + task, err = NewTask(id, hash, fileID, audio, sourceURL, probe, s.conf) + if err != nil { + return nil, err + } + + s.mu.Lock() + defer s.mu.Unlock() + + if existing := s.tasks[id]; existing != nil { + if existing.FileID == fileID && existing.Audio == audio { + task.Dispose() + existing.UpdateLastActive() + return existing, nil + } + existing.Dispose() + } + + s.tasks[id] = task + return task, nil +} + +func (s *Service) Get(id string) *Task { + if id == "" { + return nil + } + + s.mu.RLock() + task := s.tasks[id] + s.mu.RUnlock() + + if task == nil { + return nil + } + + task.UpdateLastActive() + return task +} + +func (s *Service) TryRemove(id string) bool { + if id == "" { + return false + } + + s.mu.Lock() + task := s.tasks[id] + if task != nil { + delete(s.tasks, id) + } + s.mu.Unlock() + + if task == nil { + return false + } + + task.Dispose() + if s.isEmpty() { + cleanupGSTTempFiles() + } + return true +} + +func (s *Service) Dispose() { + closeOnce(s.stopCleanup) + + s.mu.Lock() + tasks := s.tasks + s.tasks = make(map[string]*Task) + s.mu.Unlock() + + for _, task := range tasks { + task.Dispose() + } +} + +func (s *Service) cleanupLoop() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.cleanupInactive() + case <-s.stopCleanup: + return + } + } +} + +func (s *Service) cleanupInactive() { + if !s.cleanupRunning.CompareAndSwap(false, true) { + return + } + defer s.cleanupRunning.Store(false) + + now := time.Now().UTC() + + s.mu.RLock() + snapshot := make(map[string]*Task, len(s.tasks)) + for id, task := range s.tasks { + snapshot[id] = task + } + s.mu.RUnlock() + + inactiveDuration := s.conf.inactiveDuration() + removeAfter := inactiveDuration + 20*time.Minute + + for id, task := range snapshot { + lastActive := task.LastActive() + if now.After(lastActive.Add(removeAfter)) { + s.TryRemove(id) + continue + } + if !task.IsFrozen() && now.After(lastActive.Add(inactiveDuration)) { + task.Frozen() + } + } + + if s.isEmpty() { + cleanupGSTTempFiles() + } +} + +func (s *Service) isEmpty() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return len(s.tasks) == 0 +} + +func sourceURL(conf Config, hash string, fileID string) string { + if conf.normalized().Source == "play" { + return playURL(hash, fileID) + } + return streamURL(hash, fileID) +} + +func streamURL(hash string, fileID string) string { + return "http://127.0.0.1:" + settings.Port + "/stream/?link=" + url.QueryEscape(hash) + "&index=" + url.QueryEscape(fileID) + "&play" +} + +func playURL(hash string, fileID string) string { + return "http://127.0.0.1:" + settings.Port + "/play/" + url.PathEscape(hash) + "/" + url.PathEscape(fileID) +} + +func closeOnce(ch chan struct{}) { + defer func() { _ = recover() }() + close(ch) +} diff --git a/server/gstreamer/task.go b/server/gstreamer/task.go new file mode 100644 index 000000000..e05451fbc --- /dev/null +++ b/server/gstreamer/task.go @@ -0,0 +1,184 @@ +package gstreamer + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +type pipelineRunner interface { + GetSegment(ctx context.Context, index int, audio int) (Segment, error) + Seek(seconds float64) bool + Frozen() + Dispose() + IsFrozen() bool +} + +type Task struct { + ID string + Hash string + FileID string + Audio int + SourceURL string + Probe ProbeInfo + Config Config + + LastSentSegment int + + initMu sync.RWMutex + initMP4 []byte + + activeMu sync.RWMutex + lastActive time.Time + + mu sync.Mutex + runner pipelineRunner + + disposed atomic.Bool +} + +func NewTask(id string, hash string, fileID string, audio int, sourceURL string, probe ProbeInfo, conf Config) (*Task, error) { + task := &Task{ + ID: id, + Hash: hash, + FileID: fileID, + Audio: audio, + SourceURL: sourceURL, + Probe: probe, + Config: conf.normalized(), + LastSentSegment: -1, + lastActive: time.Now().UTC(), + } + + runner, err := newPipelineRunner(task, audio) + if err != nil { + return nil, err + } + task.runner = runner + return task, nil +} + +func (t *Task) UpdateLastActive() { + t.activeMu.Lock() + t.lastActive = time.Now().UTC() + t.activeMu.Unlock() +} + +func (t *Task) LastActive() time.Time { + t.activeMu.RLock() + defer t.activeMu.RUnlock() + return t.lastActive +} + +func (t *Task) InitMP4() []byte { + t.initMu.RLock() + defer t.initMu.RUnlock() + return cloneBytes(t.initMP4) +} + +func (t *Task) setInitMP4(data []byte) { + t.initMu.Lock() + t.initMP4 = cloneBytes(data) + t.initMu.Unlock() +} + +func (t *Task) EnsureInit(ctx context.Context, audio int, startIndex int) error { + t.mu.Lock() + defer t.mu.Unlock() + + if startIndex < 0 { + startIndex = 0 + } + if len(t.InitMP4()) > 0 && (startIndex == 0 || t.LastSentSegment != -1) { + return nil + } + if t.runner == nil { + return ErrTaskNotFound + } + + index := -1 + if startIndex > 0 { + index = startIndex + } + _, err := t.runner.GetSegment(ctx, index, audio) + if err == nil && startIndex > 0 && t.LastSentSegment == -1 { + t.LastSentSegment = startIndex - 1 + } + return err +} + +func (t *Task) Segment(ctx context.Context, index int, audio int) (Segment, error) { + t.mu.Lock() + defer t.mu.Unlock() + + if t.runner == nil { + return Segment{}, ErrTaskNotFound + } + + if t.LastSentSegment != -1 && t.LastSentSegment != index { + if index != t.LastSentSegment+1 { + diff := index - t.LastSentSegment + cutoff := t.Config.PipelineVideoQueue + + if diff > 0 && maxInt(60, cutoff) >= diff*t.Config.SegmentSeconds { + for i := 0; i < diff-1; i++ { + if ctx.Err() != nil { + return Segment{}, ctx.Err() + } + + t.LastSentSegment++ + if _, err := t.runner.GetSegment(ctx, t.LastSentSegment, audio); err != nil { + t.LastSentSegment-- + return Segment{}, err + } + } + } else { + if !t.runner.Seek(float64(index * t.Config.SegmentSeconds)) { + return Segment{}, ErrSegmentNotReady + } + } + } + } + + seg, err := t.runner.GetSegment(ctx, index, audio) + if err != nil { + return Segment{}, err + } + + t.LastSentSegment = index + return seg, nil +} + +func (t *Task) Frozen() { + t.mu.Lock() + defer t.mu.Unlock() + if t.runner != nil { + t.runner.Frozen() + } +} + +func (t *Task) Dispose() { + if !t.disposed.CompareAndSwap(false, true) { + return + } + + t.mu.Lock() + defer t.mu.Unlock() + if t.runner != nil { + t.runner.Dispose() + t.runner = nil + } + t.setInitMP4(nil) +} + +func (t *Task) IsFrozen() bool { + return t.runner != nil && t.runner.IsFrozen() +} + +func maxInt(a int, b int) int { + if a > b { + return a + } + return b +} diff --git a/server/gstreamer/temp.go b/server/gstreamer/temp.go new file mode 100644 index 000000000..5836aa395 --- /dev/null +++ b/server/gstreamer/temp.go @@ -0,0 +1,49 @@ +package gstreamer + +import ( + "os" + "path/filepath" + "strings" +) + +const queue2TempPrefix = "gst-" + +func queue2TempTemplate() string { + return filepath.Join(queue2TempDir(), queue2TempPrefix+"XXXXXX") +} + +func queue2TempDir() string { + dir := os.TempDir() + if exe, err := os.Executable(); err == nil { + if resolved, err := filepath.EvalSymlinks(exe); err == nil { + exe = resolved + } + if exeDir := filepath.Dir(exe); exeDir != "." && exeDir != "" { + dir = exeDir + } + } + return dir +} + +func gstPath(path string) string { + return strings.ReplaceAll(filepath.ToSlash(path), `"`, `\"`) +} + +func cleanupGSTTempFiles() { + dir := queue2TempDir() + entries, err := os.ReadDir(dir) + if err != nil { + return + } + + for _, entry := range entries { + if entry.IsDir() || !isGSTTempFileName(entry.Name()) { + continue + } + _ = os.Remove(filepath.Join(dir, entry.Name())) + } +} + +func isGSTTempFileName(name string) bool { + return len(name) == len(queue2TempPrefix)+6 && strings.HasPrefix(name, queue2TempPrefix) +} diff --git a/server/web/api/torrents.go b/server/web/api/torrents.go index cdafeafa1..e1f8a8151 100644 --- a/server/web/api/torrents.go +++ b/server/web/api/torrents.go @@ -6,6 +6,7 @@ import ( "strings" "server/dlna" + "server/gstreamer" "server/log" set "server/settings" "server/torr" @@ -185,6 +186,7 @@ func remTorrent(req torrReqJS, c *gin.Context) { return } torr.RemTorrent(req.Hash) + gstreamer.Remove(req.Hash) // TODO: remove if set.BTsets.EnableDLNA { dlna.Stop() @@ -212,13 +214,16 @@ func dropTorrent(req torrReqJS, c *gin.Context) { return } torr.DropTorrent(req.Hash) + gstreamer.Remove(req.Hash) c.Status(200) } func wipeTorrents(c *gin.Context) { torrents := torr.ListTorrent() for _, t := range torrents { - torr.RemTorrent(t.TorrentSpec.InfoHash.HexString()) + hash := t.TorrentSpec.InfoHash.HexString() + torr.RemTorrent(hash) + gstreamer.Remove(hash) } // TODO: remove (copied todo from remTorrent()) if set.BTsets.EnableDLNA { diff --git a/server/web/server.go b/server/web/server.go index 2a8857186..44a512c8f 100644 --- a/server/web/server.go +++ b/server/web/server.go @@ -3,6 +3,7 @@ package web import ( "net" "os" + "server/gstreamer" "server/proxy" "sort" @@ -81,6 +82,7 @@ func Start() { route.GET("/echo", echo) api.SetupRoute(route) + gstreamer.SetupRoute(route) msx.SetupRoute(route) pages.SetupRoute(route) if settings.Args.WebDAV { @@ -135,6 +137,7 @@ func Wait() error { } func Stop() { + gstreamer.Stop() dlna.Stop() // Unmount FUSE filesystem if mounted fuse.FuseCleanup()