diff --git a/config/config.go b/config/config.go
index 134743b8..aa7211c6 100644
--- a/config/config.go
+++ b/config/config.go
@@ -54,6 +54,8 @@ type Config struct {
TurnExternalSecret string `split_words:"true"`
TrustProxyHeaders bool `split_words:"true"`
+ SFUMode bool `default:"false" split_words:"true"`
+ TestClient bool `default:"false" split_words:"true"`
AuthMode string `default:"turn" split_words:"true"`
CorsAllowedOrigins []string `split_words:"true"`
UsersFile string `split_words:"true"`
diff --git a/go.mod b/go.mod
index 51785474..8b818981 100644
--- a/go.mod
+++ b/go.mod
@@ -9,8 +9,11 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/joho/godotenv v1.5.1
github.com/kelseyhightower/envconfig v1.4.0
+ github.com/pion/interceptor v0.1.45
github.com/pion/randutil v0.1.0
- github.com/pion/turn/v4 v4.1.4
+ github.com/pion/rtcp v1.2.16
+ github.com/pion/turn/v5 v5.0.9
+ github.com/pion/webrtc/v4 v4.2.15
github.com/prometheus/client_golang v1.23.2
github.com/rs/xid v1.6.0
github.com/rs/zerolog v1.35.1
@@ -27,16 +30,22 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
+ github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/securecookie v1.1.2 // indirect
- github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
- github.com/pion/dtls/v3 v3.0.7 // indirect
+ github.com/pion/datachannel v1.6.0 // indirect
+ github.com/pion/dtls/v3 v3.1.4 // indirect
+ github.com/pion/ice/v4 v4.2.7 // indirect
github.com/pion/logging v0.2.4 // indirect
- github.com/pion/stun/v3 v3.0.1 // indirect
- github.com/pion/transport/v3 v3.1.1 // indirect
- github.com/pion/transport/v4 v4.0.1 // indirect
+ github.com/pion/mdns/v2 v2.1.0 // indirect
+ github.com/pion/rtp v1.10.2 // indirect
+ github.com/pion/sctp v1.10.0 // indirect
+ github.com/pion/sdp/v3 v3.0.18 // indirect
+ github.com/pion/srtp/v3 v3.0.11 // indirect
+ github.com/pion/stun/v3 v3.1.5 // indirect
+ github.com/pion/transport/v4 v4.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
@@ -44,7 +53,9 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/wlynxg/anet v0.0.5 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
+ golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.43.0 // indirect
+ golang.org/x/time v0.14.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
diff --git a/go.sum b/go.sum
index b4a089be..4b44a377 100644
--- a/go.sum
+++ b/go.sum
@@ -5,7 +5,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.7 h1:zbFlGlXEAKlwXpmvle3d8Oe3YnkKIK4xSRTd3sHPnBo=
github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
-github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -15,6 +14,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
@@ -43,20 +44,40 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
-github.com/pion/dtls/v3 v3.0.7 h1:bItXtTYYhZwkPFk4t1n3Kkf5TDrfj6+4wG+CZR8uI9Q=
-github.com/pion/dtls/v3 v3.0.7/go.mod h1:uDlH5VPrgOQIw59irKYkMudSFprY9IEFCqz/eTz16f8=
+github.com/pion/datachannel v1.6.0 h1:XecBlj+cvsxhAMZWFfFcPyUaDZtd7IJvrXqlXD/53i0=
+github.com/pion/datachannel v1.6.0/go.mod h1:ur+wzYF8mWdC+Mkis5Thosk+u/VOL287apDNEbFpsIk=
+github.com/pion/dtls/v3 v3.1.4 h1:QhvtMflMfu9Kf0RcDC5BJBle4caPskByrKQR6uuYqpY=
+github.com/pion/dtls/v3 v3.1.4/go.mod h1:cr/qotLISUw/9C1m83ZPNZtj9WnXkYLpfCptPqbkInc=
+github.com/pion/ice/v4 v4.2.7 h1:zDEbC6MiEdhQpF8TxBOTws+NU6ZgGpveHrQq4Lc1kao=
+github.com/pion/ice/v4 v4.2.7/go.mod h1:9SNPaq0c7El/ki8leJzyCkK10zsskprR3zTNbO3monY=
+github.com/pion/interceptor v0.1.45 h1:6PUo/5829bIfRFIPPJQzuDn8EjxRTSB/CSD7QVCOaqo=
+github.com/pion/interceptor v0.1.45/go.mod h1:gNDYM/uFKcLe/B3gS2/7+aw6z+RDiMy2qKTnF1LO31w=
github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8=
github.com/pion/logging v0.2.4/go.mod h1:DffhXTKYdNZU+KtJ5pyQDjvOAh/GsNSyv1lbkFbe3so=
+github.com/pion/mdns/v2 v2.1.0 h1:3IJ9+Xio6tWYjhN6WwuY142P/1jA0D5ERaIqawg/fOY=
+github.com/pion/mdns/v2 v2.1.0/go.mod h1:pcez23GdynwcfRU1977qKU0mDxSeucttSHbCSfFOd9A=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
-github.com/pion/stun/v3 v3.0.1 h1:jx1uUq6BdPihF0yF33Jj2mh+C9p0atY94IkdnW174kA=
-github.com/pion/stun/v3 v3.0.1/go.mod h1:RHnvlKFg+qHgoKIqtQWMOJF52wsImCAf/Jh5GjX+4Tw=
+github.com/pion/rtcp v1.2.16 h1:fk1B1dNW4hsI78XUCljZJlC4kZOPk67mNRuQ0fcEkSo=
+github.com/pion/rtcp v1.2.16/go.mod h1:/as7VKfYbs5NIb4h6muQ35kQF/J0ZVNz2Z3xKoCBYOo=
+github.com/pion/rtp v1.10.2 h1:l+f6tTDcAH6xwepaAoW791ddhuYsJlqRATOzirO04Mo=
+github.com/pion/rtp v1.10.2/go.mod h1:Au8fc6cEByy8RLTwKTQTEeQqDB/SJDxwL4mZuxYA5Pk=
+github.com/pion/sctp v1.10.0 h1:qeoD6swF/2M5bYRcAGayqSbTKX3m4AW29CiQxG1+Pfg=
+github.com/pion/sctp v1.10.0/go.mod h1:N20Dq6LY+JvJDAh9VVh1JELngb2rQ8dPgds5yBWiPgw=
+github.com/pion/sdp/v3 v3.0.18 h1:l0bAXazKHpepazVdp+tPYnrsy9dfh7ZbT8DxesH5ZnI=
+github.com/pion/sdp/v3 v3.0.18/go.mod h1:ZREGo6A9ZygQ9XkqAj5xYCQtQpif0i6Pa81HOiAdqQ8=
+github.com/pion/srtp/v3 v3.0.11 h1:GiESUr54/K4UuPigfq/CvWUed80JenQAHXn0C2MQQIQ=
+github.com/pion/srtp/v3 v3.0.11/go.mod h1:EeZOi/sd6glM1EXapg051gdNWO9yWT1YSsgQ4SlJkns=
+github.com/pion/stun/v3 v3.1.5 h1:Y1FHlhaI6+4UoC5i/zQf4F7JvdZtB24/05oyy/GF1x8=
+github.com/pion/stun/v3 v3.1.5/go.mod h1:zRUghXSQU32Lx5orJsz3uYMkIihweXb3mu5gIns02fs=
github.com/pion/transport/v3 v3.1.1 h1:Tr684+fnnKlhPceU+ICdrw6KKkTms+5qHMgw6bIkYOM=
github.com/pion/transport/v3 v3.1.1/go.mod h1:+c2eewC5WJQHiAA46fkMMzoYZSuGzA/7E2FPrOYHctQ=
-github.com/pion/transport/v4 v4.0.1 h1:sdROELU6BZ63Ab7FrOLn13M6YdJLY20wldXW2Cu2k8o=
-github.com/pion/transport/v4 v4.0.1/go.mod h1:nEuEA4AD5lPdcIegQDpVLgNoDGreqM/YqmEx3ovP4jM=
-github.com/pion/turn/v4 v4.1.4 h1:EU11yMXKIsK43FhcUnjLlrhE4nboHZq+TXBIi3QpcxQ=
-github.com/pion/turn/v4 v4.1.4/go.mod h1:ES1DXVFKnOhuDkqn9hn5VJlSWmZPaRJLyBXoOeO/BmQ=
+github.com/pion/transport/v4 v4.0.2 h1:ifYlPqNwsy6aKQ9y8yzxXlHae5431ZrH2avkD/Rn6Tk=
+github.com/pion/transport/v4 v4.0.2/go.mod h1:06hFI+jCFcok2X2MekVufNZ/uzNZXivGBPfviSVcjgM=
+github.com/pion/turn/v5 v5.0.9 h1:zNeBfRyzGn7MPyUTvmvxeltLEjlFdSLPT1tlakoaOXM=
+github.com/pion/turn/v5 v5.0.9/go.mod h1:u3XjBqy2Z4+NhCUpDoOSsNuQDrPLvKStlCGWk6sTQ1E=
+github.com/pion/webrtc/v4 v4.2.15 h1:Ir/MauNFCfg+kgyBYPQLiGdVWFlzEcLxqtuzAkYkky0=
+github.com/pion/webrtc/v4 v4.2.15/go.mod h1:CPTcyLfIzC4scOkQ4UY4pj6WvbUGhcNLIpK28cP5h6M=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
@@ -104,8 +125,8 @@ golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY=
golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY=
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
-golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4=
-golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
+golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
+golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/router/router.go b/router/router.go
index 99eaabbd..a0e28eb6 100644
--- a/router/router.go
+++ b/router/router.go
@@ -71,7 +71,7 @@ func Router(conf config.Config, rooms *ws.Rooms, users *auth.Users, version stri
router.Methods("GET").Path("/metrics").Handler(basicAuth(promhttp.Handler(), users))
}
- ui.Register(router)
+ ui.Register(router, conf.TestClient)
return router
}
diff --git a/turn/none.go b/turn/none.go
index 24a2ef9b..44bbdba2 100644
--- a/turn/none.go
+++ b/turn/none.go
@@ -2,8 +2,10 @@ package turn
import (
"errors"
+ "fmt"
"net"
- "strconv"
+
+ "github.com/pion/turn/v5"
)
type RelayAddressGeneratorNone struct{}
@@ -12,8 +14,8 @@ func (r *RelayAddressGeneratorNone) Validate() error {
return nil
}
-func (r *RelayAddressGeneratorNone) AllocatePacketConn(network string, requestedPort int) (net.PacketConn, net.Addr, error) {
- conn, err := net.ListenPacket("udp", ":"+strconv.Itoa(requestedPort))
+func (r *RelayAddressGeneratorNone) AllocatePacketConn(conf turn.AllocateListenerConfig) (net.PacketConn, net.Addr, error) {
+ conn, err := net.ListenPacket("udp", fmt.Sprintf(":%d", conf.RequestedPort))
if err != nil {
return nil, nil, err
}
@@ -21,6 +23,10 @@ func (r *RelayAddressGeneratorNone) AllocatePacketConn(network string, requested
return conn, conn.LocalAddr(), nil
}
-func (r *RelayAddressGeneratorNone) AllocateConn(network string, requestedPort int) (net.Conn, net.Addr, error) {
- return nil, nil, errors.New("todo")
+func (r *RelayAddressGeneratorNone) AllocateListener(_ turn.AllocateListenerConfig) (net.Listener, net.Addr, error) {
+ return nil, nil, errors.New("TCP relay not supported")
+}
+
+func (r *RelayAddressGeneratorNone) AllocateConn(_ turn.AllocateConnConfig) (net.Conn, error) {
+ return nil, errors.New("TCP relay not supported")
}
diff --git a/turn/portrange.go b/turn/portrange.go
index bd7ff79a..903c0caf 100644
--- a/turn/portrange.go
+++ b/turn/portrange.go
@@ -6,6 +6,7 @@ import (
"net"
"github.com/pion/randutil"
+ "github.com/pion/turn/v5"
)
type RelayAddressGeneratorPortRange struct {
@@ -22,9 +23,9 @@ func (r *RelayAddressGeneratorPortRange) Validate() error {
return nil
}
-func (r *RelayAddressGeneratorPortRange) AllocatePacketConn(network string, requestedPort int) (net.PacketConn, net.Addr, error) {
- if requestedPort != 0 {
- conn, err := net.ListenPacket("udp", fmt.Sprintf(":%d", requestedPort))
+func (r *RelayAddressGeneratorPortRange) AllocatePacketConn(conf turn.AllocateListenerConfig) (net.PacketConn, net.Addr, error) {
+ if conf.RequestedPort != 0 {
+ conn, err := net.ListenPacket("udp", fmt.Sprintf(":%d", conf.RequestedPort))
if err != nil {
return nil, nil, err
}
@@ -46,6 +47,10 @@ func (r *RelayAddressGeneratorPortRange) AllocatePacketConn(network string, requ
return nil, nil, errors.New("could not find free port: max retries exceeded")
}
-func (r *RelayAddressGeneratorPortRange) AllocateConn(network string, requestedPort int) (net.Conn, net.Addr, error) {
- return nil, nil, errors.New("todo")
+func (r *RelayAddressGeneratorPortRange) AllocateListener(_ turn.AllocateListenerConfig) (net.Listener, net.Addr, error) {
+ return nil, nil, errors.New("TCP relay not supported")
+}
+
+func (r *RelayAddressGeneratorPortRange) AllocateConn(_ turn.AllocateConnConfig) (net.Conn, error) {
+ return nil, errors.New("TCP relay not supported")
}
diff --git a/turn/server.go b/turn/server.go
index 9d78e2a9..97967a34 100644
--- a/turn/server.go
+++ b/turn/server.go
@@ -9,7 +9,7 @@ import (
"sync"
"time"
- "github.com/pion/turn/v4"
+ "github.com/pion/turn/v5"
"github.com/rs/zerolog/log"
"github.com/screego/server/config"
"github.com/screego/server/config/ipdns"
@@ -43,8 +43,8 @@ type Generator struct {
IPProvider ipdns.Provider
}
-func (r *Generator) AllocatePacketConn(network string, requestedPort int) (net.PacketConn, net.Addr, error) {
- conn, addr, err := r.RelayAddressGenerator.AllocatePacketConn(network, requestedPort)
+func (r *Generator) AllocatePacketConn(conf turn.AllocateListenerConfig) (net.PacketConn, net.Addr, error) {
+ conn, addr, err := r.RelayAddressGenerator.AllocatePacketConn(conf)
if err != nil {
return conn, addr, err
}
@@ -155,19 +155,19 @@ func (a *ExternalServer) Disallow(username string) {
// not supported, will expire on TTL
}
-func (a *InternalServer) authenticate(username, realm string, addr net.Addr) ([]byte, bool) {
+func (a *InternalServer) authenticate(ra *turn.RequestAttributes) (string, []byte, bool) {
a.lock.RLock()
defer a.lock.RUnlock()
- entry, ok := a.lookup[username]
+ entry, ok := a.lookup[ra.Username]
if !ok {
- log.Debug().Interface("addr", addr).Str("username", username).Msg("TURN username not found")
- return nil, false
+ log.Debug().Interface("addr", ra.SrcAddr).Str("username", ra.Username).Msg("TURN username not found")
+ return "", nil, false
}
- log.Debug().Interface("addr", addr.String()).Str("realm", realm).Msg("TURN authenticated")
- return entry.password, true
+ log.Debug().Str("addr", ra.SrcAddr.String()).Str("realm", ra.Realm).Msg("TURN authenticated")
+ return ra.Username, entry.password, true
}
func (a *InternalServer) Credentials(id string, addr net.IP) (string, string) {
diff --git a/ui/public/test-client.html b/ui/public/test-client.html
new file mode 100644
index 00000000..8428c17a
--- /dev/null
+++ b/ui/public/test-client.html
@@ -0,0 +1,205 @@
+
+
+
+
+screego SFU test client
+
+
+
+screego SFU test client
+
+
+
+
+
+
+
+
+Disconnected
+
+
+
+
+
+
diff --git a/ui/serve.go b/ui/serve.go
index f3c839e3..594eb87b 100644
--- a/ui/serve.go
+++ b/ui/serve.go
@@ -15,11 +15,14 @@ var buildFiles embed.FS
var files, _ = fs.Sub(buildFiles, "build")
// Register registers the ui on the root path.
-func Register(r *mux.Router) {
+func Register(r *mux.Router, testClient bool) {
r.Handle("/", serveFile("index.html", "text/html"))
r.Handle("/index.html", serveFile("index.html", "text/html"))
r.Handle("/assets/{resource}", http.FileServer(http.FS(files)))
+ if testClient {
+ r.Handle("/test-client.html", serveFile("test-client.html", "text/html"))
+ }
r.Handle("/favicon.ico", serveFile("favicon.ico", "image/x-icon"))
r.Handle("/logo.svg", serveFile("logo.svg", "image/svg+xml"))
r.Handle("/apple-touch-icon.png", serveFile("apple-touch-icon.png", "image/png"))
diff --git a/ui/src/Room.tsx b/ui/src/Room.tsx
index de04cf8f..72e8e559 100644
--- a/ui/src/Room.tsx
+++ b/ui/src/Room.tsx
@@ -7,6 +7,9 @@ import PeopleIcon from '@mui/icons-material/People';
import VolumeMuteIcon from '@mui/icons-material/VolumeOff';
import VolumeIcon from '@mui/icons-material/VolumeUp';
import SettingsIcon from '@mui/icons-material/Settings';
+import CloseIcon from '@mui/icons-material/Close';
+import VisibilityIcon from '@mui/icons-material/Visibility';
+import VisibilityOffIcon from '@mui/icons-material/VisibilityOff';
import {useHotkeys} from 'react-hotkeys-hook';
import {Video} from './Video';
import {makeStyles} from 'tss-react/mui';
@@ -58,11 +61,15 @@ export const Room = ({
share,
stopShare,
setName,
+ subscribeStream,
+ unsubscribeStream,
}: {
state: ConnectedRoom;
share: () => void;
stopShare: () => void;
setName: (name: string) => void;
+ subscribeStream: (userID: string) => void;
+ unsubscribeStream: (sessionID: string) => void;
}) => {
const {classes} = useStyles();
const [open, setOpen] = React.useState(false);
@@ -70,8 +77,10 @@ export const Room = ({
const [settings, setSettings] = useSettings();
const [showControl, setShowControl] = React.useState(true);
const [hoverControl, setHoverControl] = React.useState(false);
+ const [showStrip, setShowStrip] = React.useState(true);
const [selectedStream, setSelectedStream] = React.useState();
const [videoElement, setVideoElement] = React.useState(null);
+ const explicitDeselect = React.useRef(false);
useShowOnMouseMovement(setShowControl);
@@ -79,18 +88,30 @@ export const Room = ({
React.useEffect(() => {
if (selectedStream === HostStream && state.hostStream) {
+ explicitDeselect.current = false;
return;
}
if (state.clientStreams.some(({id}) => id === selectedStream)) {
+ explicitDeselect.current = false;
return;
}
if (state.clientStreams.length === 0 && selectedStream) {
setSelectedStream(undefined);
return;
}
- setSelectedStream(state.clientStreams[0]?.id);
+ if (!explicitDeselect.current) {
+ setSelectedStream(state.clientStreams[0]?.id);
+ }
}, [state.clientStreams, selectedStream, state.hostStream]);
+ React.useEffect(() => {
+ if (!showStrip) {
+ state.clientStreams
+ .filter((cs) => cs.id !== selectedStream)
+ .forEach((cs) => unsubscribeStream(cs.id));
+ }
+ }, [state.clientStreams, showStrip, selectedStream, unsubscribeStream]);
+
const stream =
selectedStream === HostStream
? state.hostStream
@@ -214,11 +235,30 @@ export const Room = ({
)}
{stream ? (
-
+ <>
+
+ {controlVisible && (
+
+ {
+ if (!showStrip && typeof selectedStream === 'string') {
+ unsubscribeStream(selectedStream);
+ }
+ explicitDeselect.current = true;
+ setSelectedStream(undefined);
+ }}
+ sx={{position: 'absolute', top: 8, right: 8, zIndex: 25, bgcolor: 'rgba(0,0,0,0.4)'}}
+ >
+
+
+
+ )}
+ >
) : (
+
+ {
+ if (showStrip) {
+ state.clientStreams
+ .filter((cs) => cs.id !== selectedStream)
+ .forEach((cs) => unsubscribeStream(cs.id));
+ setShowStrip(false);
+ } else {
+ const subscribedIDs = new Set(
+ state.clientStreams.map((cs) => cs.peer_id)
+ );
+ state.users
+ .filter(
+ (u) =>
+ u.streaming &&
+ !u.you &&
+ !subscribedIDs.has(u.id)
+ )
+ .forEach((u) => subscribeStream(u.id));
+ setShowStrip(true);
+ }
+ }}
+ size="large"
+ color={showStrip ? 'primary' : 'inherit'}
+ >
+ {showStrip ? (
+
+ ) : (
+
+ )}
+
+
+
setOpen(true)} size="large">
@@ -292,58 +366,60 @@ export const Room = ({
)}
-
- {state.clientStreams
- .filter(({id}) => id !== selectedStream)
- .map((client) => {
- return (
-
setSelectedStream(client.id)}
- >
-
- );
- })}
- {state.hostStream && selectedStream !== HostStream && (
-
setSelectedStream(HostStream)}
- >
-
-
+
+ {state.users.find(({id}) => client.peer_id === id)?.name ??
+ 'unknown'}
+
+
+ );
+ })}
+ {state.hostStream && selectedStream !== HostStream && (
+
setSelectedStream(HostStream)}
>
- You
-
-
- )}
-
-
+
+
+ You
+
+
+ )}
+
+ )}
+
);
};
diff --git a/ui/src/message.ts b/ui/src/message.ts
index c15f1c7b..5392914d 100644
--- a/ui/src/message.ts
+++ b/ui/src/message.ts
@@ -41,6 +41,7 @@ export interface P2PSession {
id: string;
peer: string;
iceServers: ICEServer[];
+ mode?: 'sfu'; // present only in hostsession when SCREEGO_SFU_MODE=true; absent means p2p
}
export interface ICEServer {
@@ -83,6 +84,8 @@ export type StopShare = Typed<{}, 'stopshare'>;
export type RoomCreate = Typed;
export type JoinRoom = Typed;
export type EndShare = Typed;
+export type Subscribe = Typed<{id: string}, 'subscribe'>;
+export type Unsubscribe = Typed<{id: string}, 'unsubscribe'>;
export type IncomingMessage =
| Room
@@ -104,4 +107,6 @@ export type OutgoingMessage =
| HostOffer
| StopShare
| ClientAnswer
- | StartSharing;
+ | StartSharing
+ | Subscribe
+ | Unsubscribe;
diff --git a/ui/src/useRoom.ts b/ui/src/useRoom.ts
index 5385b9b1..15b04eea 100644
--- a/ui/src/useRoom.ts
+++ b/ui/src/useRoom.ts
@@ -34,6 +34,8 @@ export interface UseRoom {
share: () => void;
setName: (name: string) => void;
stopShare: () => void;
+ subscribeStream: (userID: string) => void;
+ unsubscribeStream: (sessionID: string) => void;
}
const relayConfig: Partial =
@@ -165,7 +167,9 @@ export const useRoom = (config: UIConfig): UseRoom => {
const conn = React.useRef(undefined);
const host = React.useRef>({});
const client = React.useRef>({});
+ const clientPeerID = React.useRef>({});
const stream = React.useRef(undefined);
+ const hostSID = React.useRef(undefined); // SFU: session ID for the upload PC
const [state, setState] = React.useState(false);
@@ -205,24 +209,55 @@ export const useRoom = (config: UIConfig): UseRoom => {
if (!stream.current) {
return;
}
- hostSession({
- sid: event.payload.id,
- stream: stream.current!,
- ice: event.payload.iceServers,
- send,
- done: () => delete host.current[event.payload.id],
- }).then((peer) => {
- host.current[event.payload.id] = peer;
- });
+ if (event.payload.mode === 'sfu') {
+ // SFU mode: create upload-only PC, add tracks, wait for server's offer.
+ const sid = event.payload.id;
+ hostSID.current = sid;
+ const pc = new RTCPeerConnection({
+ ...relayConfig,
+ iceServers: event.payload.iceServers,
+ });
+ stream.current.getTracks().forEach((track) =>
+ pc.addTrack(track, stream.current!)
+ );
+ pc.onicecandidate = (ev) => {
+ if (!ev.candidate) return;
+ send({type: 'hostice', payload: {sid, value: ev.candidate}});
+ };
+ pc.onconnectionstatechange = () => {
+ if (
+ pc.connectionState === 'closed' ||
+ pc.connectionState === 'disconnected' ||
+ pc.connectionState === 'failed'
+ ) {
+ pc.close();
+ delete host.current[sid];
+ }
+ };
+ host.current[sid] = pc;
+ } else {
+ // P2P mode: host creates and sends the offer.
+ hostSession({
+ sid: event.payload.id,
+ stream: stream.current!,
+ ice: event.payload.iceServers,
+ send,
+ done: () => delete host.current[event.payload.id],
+ }).then((peer) => {
+ host.current[event.payload.id] = peer;
+ });
+ }
return;
case 'clientsession':
const {id: sid, peer} = event.payload;
+ clientPeerID.current[sid] = peer;
clientSession({
sid,
send,
ice: event.payload.iceServers,
done: () => {
delete client.current[sid];
+ delete clientPeerID.current[sid];
setState((current) =>
current
? {
@@ -262,18 +297,46 @@ export const useRoom = (config: UIConfig): UseRoom => {
return;
case 'hostoffer':
(async () => {
- await client.current[event.payload.sid]?.setRemoteDescription(
- event.payload.value
- );
- const answer =
- await client.current[event.payload.sid]?.createAnswer();
- await client.current[event.payload.sid]?.setLocalDescription(
- answer
- );
- send({
- type: 'clientanswer',
- payload: {sid: event.payload.sid, value: answer},
- });
+ const sid = event.payload.sid;
+ if (host.current[sid]) {
+ // SFU mode: server sent its recvonly offer to the sharer browser.
+ const pc = host.current[sid];
+ await pc.setRemoteDescription(event.payload.value);
+ // Apply codec preference on the sender transceiver once remote
+ // description is set (transceiver direction is known at this point).
+ const preferCodec = resolveCodecPlaceholder(loadSettings().preferCodec);
+ if (preferCodec) {
+ const transceiver = pc
+ .getTransceivers()
+ .find((t) => t.sender?.track === stream.current?.getVideoTracks()[0]);
+ if (transceiver && 'setCodecPreferences' in transceiver) {
+ const exactMatch: RTCRtpCodec[] = [];
+ const mimeMatch: RTCRtpCodec[] = [];
+ const others: RTCRtpCodec[] = [];
+ RTCRtpReceiver.getCapabilities('video')?.codecs.forEach((codec) => {
+ if (codec.mimeType === preferCodec.mimeType) {
+ if (codec.sdpFmtpLine === preferCodec.sdpFmtpLine) {
+ exactMatch.push(codec);
+ } else {
+ mimeMatch.push(codec);
+ }
+ } else {
+ others.push(codec);
+ }
+ });
+ transceiver.setCodecPreferences([...exactMatch, ...mimeMatch, ...others]);
+ }
+ }
+ const answer = await pc.createAnswer();
+ await pc.setLocalDescription(answer);
+ send({type: 'clientanswer', payload: {sid, value: answer}});
+ } else {
+ // P2P mode (or SFU viewer): relay host's offer to the viewer PC.
+ await client.current[sid]?.setRemoteDescription(event.payload.value);
+ const answer = await client.current[sid]?.createAnswer();
+ await client.current[sid]?.setLocalDescription(answer);
+ send({type: 'clientanswer', payload: {sid, value: answer}});
+ }
})();
return;
case 'hostice':
@@ -282,6 +345,7 @@ export const useRoom = (config: UIConfig): UseRoom => {
case 'endshare':
client.current[event.payload]?.close();
host.current[event.payload]?.close();
+ delete clientPeerID.current[event.payload];
setState((current) =>
current
? {
@@ -362,10 +426,16 @@ export const useRoom = (config: UIConfig): UseRoom => {
};
const stopShare = async () => {
- Object.values(host.current).forEach((peer) => {
- peer.close();
- });
- host.current = {};
+ if (hostSID.current) {
+ // SFU mode: only the upload PC exists in host.current.
+ host.current[hostSID.current]?.close();
+ delete host.current[hostSID.current];
+ hostSID.current = undefined;
+ } else {
+ // P2P mode: one host PC per viewer.
+ Object.values(host.current).forEach((peer) => peer.close());
+ host.current = {};
+ }
stream.current?.getTracks().forEach((track) => track.stop());
stream.current = undefined;
conn.current?.send(JSON.stringify({type: 'stopshare', payload: {}}));
@@ -376,6 +446,25 @@ export const useRoom = (config: UIConfig): UseRoom => {
conn.current?.send(JSON.stringify({type: 'name', payload: {username: name}}));
};
+ const subscribeStream = React.useCallback((userID: string): void => {
+ conn.current?.send(JSON.stringify({type: 'subscribe', payload: {id: userID}}));
+ }, []);
+
+ const unsubscribeStream = React.useCallback((sessionID: string): void => {
+ const peerID = clientPeerID.current[sessionID];
+ client.current[sessionID]?.close();
+ delete client.current[sessionID];
+ delete clientPeerID.current[sessionID];
+ setState((current) =>
+ current
+ ? {...current, clientStreams: current.clientStreams.filter(({id}) => id !== sessionID)}
+ : current
+ );
+ if (peerID) {
+ conn.current?.send(JSON.stringify({type: 'unsubscribe', payload: {id: peerID}}));
+ }
+ }, []);
+
React.useEffect(() => {
if (roomID) {
const create = getFromURL('create') === 'true';
@@ -401,5 +490,5 @@ export const useRoom = (config: UIConfig): UseRoom => {
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
- return {state, room, share, stopShare, setName};
+ return {state, room, share, stopShare, setName, subscribeStream, unsubscribeStream};
};
diff --git a/ws/event_clientanswer.go b/ws/event_clientanswer.go
index 19ea7671..934167d2 100644
--- a/ws/event_clientanswer.go
+++ b/ws/event_clientanswer.go
@@ -1,8 +1,10 @@
package ws
import (
+ "encoding/json"
"fmt"
+ "github.com/pion/webrtc/v4"
"github.com/rs/zerolog/log"
"github.com/screego/server/ws/outgoing"
)
@@ -21,6 +23,40 @@ func (e *ClientAnswer) Execute(rooms *Rooms, current ClientInfo) error {
return err
}
+ if rooms.config.SFUMode {
+ var sd webrtc.SessionDescription
+ if err := json.Unmarshal(e.Value, &sd); err != nil {
+ return fmt.Errorf("SFU: unmarshal answer SDP: %w", err)
+ }
+
+ // Sharer answering the server's recvonly offer.
+ if h := room.sfuHostBySession(e.SID); h != nil {
+ if room.Users[current.ID] == nil || !room.Users[current.ID].Streaming {
+ return fmt.Errorf("permission denied: not the sharing host")
+ }
+ if h.PC == nil {
+ log.Debug().Msg("SFU: clientanswer for host but PC is nil")
+ return nil
+ }
+ return h.PC.SetRemoteDescription(sd)
+ }
+
+ // Viewer answering the server's sendonly offer.
+ session, ok := room.Sessions[e.SID]
+ if !ok {
+ log.Debug().Str("id", e.SID.String()).Msg("SFU: unknown session for clientanswer")
+ return nil
+ }
+ if session.Client != current.ID {
+ return fmt.Errorf("permission denied for session %s", e.SID)
+ }
+ if session.ViewerPC == nil {
+ log.Debug().Str("id", e.SID.String()).Msg("SFU: clientanswer but ViewerPC is nil")
+ return nil
+ }
+ return session.ViewerPC.SetRemoteDescription(sd)
+ }
+
session, ok := room.Sessions[e.SID]
if !ok {
diff --git a/ws/event_clientice.go b/ws/event_clientice.go
index fc76068e..22181796 100644
--- a/ws/event_clientice.go
+++ b/ws/event_clientice.go
@@ -1,8 +1,10 @@
package ws
import (
+ "encoding/json"
"fmt"
+ "github.com/pion/webrtc/v4"
"github.com/rs/zerolog/log"
"github.com/screego/server/ws/outgoing"
)
@@ -21,6 +23,26 @@ func (e *ClientICE) Execute(rooms *Rooms, current ClientInfo) error {
return err
}
+ if rooms.config.SFUMode {
+ // Viewer sends ICE for its download connection to the server's ViewerPC.
+ session, ok := room.Sessions[e.SID]
+ if !ok {
+ log.Debug().Str("id", e.SID.String()).Msg("SFU: unknown session for clientice")
+ return nil
+ }
+ if session.Client != current.ID {
+ return fmt.Errorf("permission denied for session %s", e.SID)
+ }
+ if session.ViewerPC == nil {
+ return nil
+ }
+ var init webrtc.ICECandidateInit
+ if err := json.Unmarshal(e.Value, &init); err != nil {
+ return fmt.Errorf("SFU: unmarshal viewer ICE candidate: %w", err)
+ }
+ return session.ViewerPC.AddICECandidate(init)
+ }
+
session, ok := room.Sessions[e.SID]
if !ok {
diff --git a/ws/event_create.go b/ws/event_create.go
index 2c10b5a5..80a1284d 100644
--- a/ws/event_create.go
+++ b/ws/event_create.go
@@ -63,6 +63,7 @@ func (e *Create) Execute(rooms *Rooms, current ClientInfo) error {
CloseOnOwnerLeave: e.CloseOnOwnerLeave,
Mode: e.Mode,
Sessions: map[xid.ID]*RoomSession{},
+ SFUHosts: map[xid.ID]*SFUHost{},
Users: map[xid.ID]*User{
current.ID: {
ID: current.ID,
diff --git a/ws/event_disconnected.go b/ws/event_disconnected.go
index 67b236ea..b3597056 100644
--- a/ws/event_disconnected.go
+++ b/ws/event_disconnected.go
@@ -42,6 +42,10 @@ func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) {
delete(room.Users, current.ID)
usersLeftTotal.Inc()
+ if rooms.config.SFUMode && user.Streaming {
+ room.closeSFUHost(current.ID)
+ }
+
for id, session := range room.Sessions {
if bytes.Equal(session.Client.Bytes(), current.ID.Bytes()) {
host, ok := room.Users[session.Host]
diff --git a/ws/event_hostice.go b/ws/event_hostice.go
index 93189be6..34566b65 100644
--- a/ws/event_hostice.go
+++ b/ws/event_hostice.go
@@ -1,8 +1,10 @@
package ws
import (
+ "encoding/json"
"fmt"
+ "github.com/pion/webrtc/v4"
"github.com/rs/zerolog/log"
"github.com/screego/server/ws/outgoing"
)
@@ -21,6 +23,26 @@ func (e *HostICE) Execute(rooms *Rooms, current ClientInfo) error {
return err
}
+ if rooms.config.SFUMode {
+ // Sharer sends ICE for its upload connection to the server.
+ h := room.sfuHostBySession(e.SID)
+ if h == nil {
+ log.Debug().Str("id", e.SID.String()).Msg("SFU: unexpected hostice for unknown session")
+ return nil
+ }
+ if room.Users[current.ID] == nil || !room.Users[current.ID].Streaming {
+ return fmt.Errorf("permission denied: not the sharing host")
+ }
+ if h.PC == nil {
+ return nil
+ }
+ var init webrtc.ICECandidateInit
+ if err := json.Unmarshal(e.Value, &init); err != nil {
+ return fmt.Errorf("SFU: unmarshal host ICE candidate: %w", err)
+ }
+ return h.PC.AddICECandidate(init)
+ }
+
session, ok := room.Sessions[e.SID]
if !ok {
diff --git a/ws/event_hostoffer.go b/ws/event_hostoffer.go
index 36a86a42..58a148ef 100644
--- a/ws/event_hostoffer.go
+++ b/ws/event_hostoffer.go
@@ -21,6 +21,12 @@ func (e *HostOffer) Execute(rooms *Rooms, current ClientInfo) error {
return err
}
+ if rooms.config.SFUMode {
+ // In SFU mode the server originates all offers; browser-sent hostoffer is unexpected.
+ log.Debug().Str("id", e.SID.String()).Msg("SFU: ignoring browser-originated hostoffer")
+ return nil
+ }
+
session, ok := room.Sessions[e.SID]
if !ok {
diff --git a/ws/event_join.go b/ws/event_join.go
index 8e14f0bd..cbff5f17 100644
--- a/ws/event_join.go
+++ b/ws/event_join.go
@@ -1,7 +1,10 @@
package ws
import (
+ "errors"
"fmt"
+
+ "github.com/screego/server/config"
)
func init() {
@@ -24,6 +27,10 @@ func (e *Join) Execute(rooms *Rooms, current ClientInfo) error {
if !ok {
return fmt.Errorf("room with id %s does not exist", e.ID)
}
+
+ if rooms.config.AuthMode == config.AuthModeAll && !current.Authenticated {
+ return errors.New("you need to login")
+ }
name := e.UserName
if current.Authenticated {
name = current.AuthenticatedUser
@@ -53,7 +60,20 @@ func (e *Join) Execute(rooms *Rooms, current ClientInfo) error {
if current.ID == user.ID || !user.Streaming {
continue
}
- room.newSession(user.ID, current.ID, rooms, v4, v6)
+ if rooms.config.SFUMode {
+ h, ok := room.SFUHosts[user.ID]
+ if !ok {
+ continue
+ }
+ if h.Track != nil {
+ createViewerPC(room, rooms, user.ID, current.ID, v4, v6)
+ } else {
+ // Host PC exists but OnTrack hasn't fired yet; defer until the track arrives.
+ h.Pending = append(h.Pending, current.ID)
+ }
+ } else {
+ room.newSession(user.ID, current.ID, rooms, v4, v6)
+ }
}
return nil
diff --git a/ws/event_sfu_internal.go b/ws/event_sfu_internal.go
new file mode 100644
index 00000000..89bbe25a
--- /dev/null
+++ b/ws/event_sfu_internal.go
@@ -0,0 +1,178 @@
+package ws
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/pion/rtcp"
+ "github.com/pion/webrtc/v4"
+ "github.com/rs/xid"
+ "github.com/rs/zerolog/log"
+ "github.com/screego/server/ws/outgoing"
+)
+
+// SFUHostTrack is posted by the pion OnTrack callback when a sharer's video arrives.
+// Running in the main event loop, it is safe to mutate Room state.
+type SFUHostTrack struct {
+ RoomID string
+ SharerID xid.ID // user ID of the sharer
+ Track *webrtc.TrackRemote
+ Receiver *webrtc.RTPReceiver
+}
+
+func (e *SFUHostTrack) Execute(rooms *Rooms, _ ClientInfo) error {
+ room, ok := rooms.Rooms[e.RoomID]
+ if !ok {
+ return nil
+ }
+
+ h, ok := room.SFUHosts[e.SharerID]
+ if !ok {
+ return nil
+ }
+
+ shared, err := webrtc.NewTrackLocalStaticRTP(
+ e.Track.Codec().RTPCodecCapability, "video", "screego-sfu")
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: create shared track")
+ return nil
+ }
+ h.Track = shared
+ h.SSRC = uint32(e.Track.SSRC())
+
+ ctx, cancel := context.WithCancel(context.Background())
+ h.Cancel = cancel
+
+ startRTPForward(ctx, e.Track, e.Receiver, shared)
+
+ // Connect any viewers who joined before the track arrived.
+ pending := h.Pending
+ h.Pending = nil
+
+ v4addr, v6addr, err := rooms.config.TurnIPProvider.Get()
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: get TURN IPs for pending viewers")
+ return nil
+ }
+
+ for _, viewerID := range pending {
+ if _, ok := room.Users[viewerID]; !ok {
+ continue
+ }
+ createViewerPC(room, rooms, e.SharerID, viewerID, v4addr, v6addr)
+ }
+
+ return nil
+}
+
+// SFUIceCandidate is posted by pion OnICECandidate callbacks and routes the server's
+// ICE candidates to the correct browser peer.
+//
+// ICE routing (naming inherited from P2P era):
+// - server → sharer browser : outgoing.ClientICE (browser routes via host.current[sid])
+// - server → viewer browser : outgoing.HostICE (browser routes via client.current[sid])
+type SFUIceCandidate struct {
+ RoomID string
+ SessionID xid.ID
+ SharerID xid.ID // user ID of the sharer (populated for both host and viewer candidates)
+ IsHost bool
+ Candidate *webrtc.ICECandidate
+}
+
+func (e *SFUIceCandidate) Execute(rooms *Rooms, _ ClientInfo) error {
+ room, ok := rooms.Rooms[e.RoomID]
+ if !ok {
+ return nil
+ }
+
+ raw, err := json.Marshal(e.Candidate.ToJSON())
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: marshal ICE candidate")
+ return nil
+ }
+
+ if e.IsHost {
+ // Send to the sharer browser via clientice (routes to host.current[sid] in browser).
+ if sharer, ok := room.Users[e.SharerID]; ok {
+ sharer.WriteTimeout(outgoing.ClientICE{SID: e.SessionID, Value: raw})
+ }
+ } else {
+ session, ok := room.Sessions[e.SessionID]
+ if !ok {
+ return nil
+ }
+ viewer, ok := room.Users[session.Client]
+ if !ok {
+ return nil
+ }
+ // hostice → browser routes to client.current[sid]
+ viewer.WriteTimeout(outgoing.HostICE{SID: e.SessionID, Value: raw})
+ }
+ return nil
+}
+
+// SFUSendPLI is posted by viewer PC callbacks to request a keyframe from the sharer.
+// Safe to call from any goroutine — always executed inside the main event loop.
+type SFUSendPLI struct {
+ RoomID string
+ SharerID xid.ID
+}
+
+func (e *SFUSendPLI) Execute(rooms *Rooms, _ ClientInfo) error {
+ room, ok := rooms.Rooms[e.RoomID]
+ if !ok {
+ return nil
+ }
+ h, ok := room.SFUHosts[e.SharerID]
+ if !ok || h.PC == nil || h.SSRC == 0 {
+ return nil
+ }
+ return h.PC.WriteRTCP([]rtcp.Packet{
+ &rtcp.PictureLossIndication{MediaSSRC: h.SSRC},
+ })
+}
+
+// SFUConnectionFailed is posted when a pion PC enters failed/disconnected state.
+type SFUConnectionFailed struct {
+ RoomID string
+ SharerID xid.ID // user ID of the sharer (always set)
+ IsHost bool
+ SessionID xid.ID // only set when IsHost=false
+}
+
+func (e *SFUConnectionFailed) Execute(rooms *Rooms, _ ClientInfo) error {
+ room, ok := rooms.Rooms[e.RoomID]
+ if !ok {
+ return nil
+ }
+
+ if e.IsHost {
+ room.closeSFUHost(e.SharerID)
+ // Close all viewer sessions for this sharer.
+ for id, session := range room.Sessions {
+ if session.Host != e.SharerID {
+ continue
+ }
+ if viewer, ok := room.Users[session.Client]; ok {
+ viewer.WriteTimeout(outgoing.EndShare(id))
+ }
+ room.closeSession(rooms, id)
+ }
+ if sharer, ok := room.Users[e.SharerID]; ok {
+ sharer.Streaming = false
+ }
+ room.notifyInfoChanged()
+ return nil
+ }
+
+ // A single viewer's PC failed.
+ session, ok := room.Sessions[e.SessionID]
+ if !ok {
+ return nil
+ }
+ if viewer, ok := room.Users[session.Client]; ok {
+ viewer.WriteTimeout(outgoing.EndShare(e.SessionID))
+ }
+ room.closeSession(rooms, e.SessionID)
+ return nil
+}
diff --git a/ws/event_share.go b/ws/event_share.go
index f76296f8..8e2b01e3 100644
--- a/ws/event_share.go
+++ b/ws/event_share.go
@@ -1,5 +1,7 @@
package ws
+import "github.com/rs/xid"
+
func init() {
register("share", func() Event {
return &StartShare{}
@@ -21,11 +23,27 @@ func (e *StartShare) Execute(rooms *Rooms, current ClientInfo) error {
return err
}
- for _, user := range room.Users {
- if current.ID == user.ID {
- continue
+ if rooms.config.SFUMode {
+ if room.SFUHosts == nil {
+ room.SFUHosts = make(map[xid.ID]*SFUHost)
+ }
+ // Collect all other users as pending viewers for this sharer.
+ // SFUHostTrack.Execute will create their ViewerPCs once the track arrives.
+ pending := make([]xid.ID, 0, len(room.Users)-1)
+ for _, user := range room.Users {
+ if user.ID != current.ID {
+ pending = append(pending, user.ID)
+ }
+ }
+ room.SFUHosts[current.ID] = &SFUHost{Pending: pending}
+ createHostPC(room, rooms, current.ID, v4, v6)
+ } else {
+ for _, user := range room.Users {
+ if current.ID == user.ID {
+ continue
+ }
+ room.newSession(current.ID, user.ID, rooms, v4, v6)
}
- room.newSession(current.ID, user.ID, rooms, v4, v6)
}
room.notifyInfoChanged()
diff --git a/ws/event_stop_share.go b/ws/event_stop_share.go
index 1760c4d2..44943911 100644
--- a/ws/event_stop_share.go
+++ b/ws/event_stop_share.go
@@ -21,6 +21,11 @@ func (e *StopShare) Execute(rooms *Rooms, current ClientInfo) error {
}
room.Users[current.ID].Streaming = false
+
+ if rooms.config.SFUMode {
+ room.closeSFUHost(current.ID)
+ }
+
for id, session := range room.Sessions {
if bytes.Equal(session.Host.Bytes(), current.ID.Bytes()) {
client, ok := room.Users[session.Client]
diff --git a/ws/event_subscribe.go b/ws/event_subscribe.go
new file mode 100644
index 00000000..2ce1f247
--- /dev/null
+++ b/ws/event_subscribe.go
@@ -0,0 +1,50 @@
+package ws
+
+import (
+ "fmt"
+
+ "github.com/rs/xid"
+)
+
+func init() {
+ register("subscribe", func() Event {
+ return &Subscribe{}
+ })
+}
+
+type Subscribe struct {
+ SharerID xid.ID `json:"id"`
+}
+
+func (e *Subscribe) Execute(rooms *Rooms, current ClientInfo) error {
+ room, err := rooms.CurrentRoom(current)
+ if err != nil {
+ return err
+ }
+
+ sharer, ok := room.Users[e.SharerID]
+ if !ok || !sharer.Streaming {
+ return fmt.Errorf("user %s is not streaming", e.SharerID)
+ }
+
+ v4, v6, err := rooms.config.TurnIPProvider.Get()
+ if err != nil {
+ return err
+ }
+
+ if rooms.config.SFUMode {
+ h, ok := room.SFUHosts[e.SharerID]
+ if !ok {
+ return fmt.Errorf("no SFU host for sharer %s", e.SharerID)
+ }
+ if h.Track != nil {
+ createViewerPC(room, rooms, e.SharerID, current.ID, v4, v6)
+ } else {
+ h.Pending = append(h.Pending, current.ID)
+ }
+ } else {
+ room.newSession(e.SharerID, current.ID, rooms, v4, v6)
+ }
+
+ return nil
+}
diff --git a/ws/event_unsubscribe.go b/ws/event_unsubscribe.go
new file mode 100644
index 00000000..fb8f8956
--- /dev/null
+++ b/ws/event_unsubscribe.go
@@ -0,0 +1,54 @@
+package ws
+
+import (
+ "github.com/rs/xid"
+ "github.com/screego/server/ws/outgoing"
+)
+
+func init() {
+ register("unsubscribe", func() Event {
+ return &Unsubscribe{}
+ })
+}
+
+type Unsubscribe struct {
+ SharerID xid.ID `json:"id"`
+}
+
+func (e *Unsubscribe) Execute(rooms *Rooms, current ClientInfo) error {
+ room, err := rooms.CurrentRoom(current)
+ if err != nil {
+ return err
+ }
+
+ if rooms.config.SFUMode {
+ // Remove from Pending if the track hadn't arrived yet.
+ if h, ok := room.SFUHosts[e.SharerID]; ok {
+ filtered := h.Pending[:0]
+ for _, id := range h.Pending {
+ if id != current.ID {
+ filtered = append(filtered, id)
+ }
+ }
+ h.Pending = filtered
+ }
+ }
+
+ sid, session, ok := room.sessionByPeers(e.SharerID, current.ID)
+ if !ok {
+ // Still pending (track not arrived yet); removal above is sufficient.
+ return nil
+ }
+
+ if session.ViewerPC != nil && rooms.config.SFUMode {
+ _ = session.ViewerPC.Close()
+ } else if !rooms.config.SFUMode {
+ // P2P: notify the sharer so it closes its host PC for this viewer.
+ if sharer, ok := room.Users[e.SharerID]; ok {
+ sharer.WriteTimeout(outgoing.EndShare(sid))
+ }
+ }
+
+ room.closeSession(rooms, sid)
+ return nil
+}
diff --git a/ws/outgoing/messages.go b/ws/outgoing/messages.go
index bb66a6bc..4eb74120 100644
--- a/ws/outgoing/messages.go
+++ b/ws/outgoing/messages.go
@@ -32,6 +32,7 @@ type HostSession struct {
ID xid.ID `json:"id"`
Peer xid.ID `json:"peer"`
ICEServers []ICEServer `json:"iceServers"`
+ Mode string `json:"mode,omitempty"` // "sfu" in SFU mode; absent means p2p
}
func (HostSession) Type() string {
diff --git a/ws/room.go b/ws/room.go
index 758ea017..4d206afe 100644
--- a/ws/room.go
+++ b/ws/room.go
@@ -1,11 +1,13 @@
package ws
import (
+ "context"
"fmt"
"net"
"sort"
"time"
+ "github.com/pion/webrtc/v4"
"github.com/rs/xid"
"github.com/rs/zerolog/log"
"github.com/screego/server/config"
@@ -20,12 +22,26 @@ const (
ConnectionTURN ConnectionMode = config.AuthModeTurn
)
+// SFUHost holds server-side state for one active sharer in SFU mode.
+type SFUHost struct {
+ PC *webrtc.PeerConnection
+ Cancel context.CancelFunc
+ SessionID xid.ID
+ SSRC uint32
+ Track *webrtc.TrackLocalStaticRTP
+ Pending []xid.ID // viewers waiting for this sharer's track to arrive
+}
+
type Room struct {
ID string
CloseOnOwnerLeave bool
Mode ConnectionMode
Users map[xid.ID]*User
Sessions map[xid.ID]*RoomSession
+
+ // SFUHosts holds per-sharer SFU state; keyed by sharer user ID.
+ // Only populated when SCREEGO_SFU_MODE=true. Supports multiple simultaneous sharers.
+ SFUHosts map[xid.ID]*SFUHost
}
const (
@@ -83,6 +99,9 @@ func (r *Rooms) addresses(prefix string, v4, v6 net.IP, tcp bool) (result []stri
}
func (r *Room) closeSession(rooms *Rooms, id xid.ID) {
+ if session, ok := r.Sessions[id]; ok && session.ViewerPC != nil {
+ _ = session.ViewerPC.Close()
+ }
if r.Mode == ConnectionTURN {
rooms.turnServer.Disallow(id.String() + "host")
rooms.turnServer.Disallow(id.String() + "client")
@@ -91,9 +110,42 @@ func (r *Room) closeSession(rooms *Rooms, id xid.ID) {
sessionClosedTotal.Inc()
}
+// sfuHostBySession returns the SFUHost whose SessionID matches sid, or nil.
+func (r *Room) sfuHostBySession(sid xid.ID) *SFUHost {
+ for _, h := range r.SFUHosts {
+ if h.SessionID == sid {
+ return h
+ }
+ }
+ return nil
+}
+
+// closeSFUHost tears down one sharer's SFU state.
+func (r *Room) closeSFUHost(sharerID xid.ID) {
+ h, ok := r.SFUHosts[sharerID]
+ if !ok {
+ return
+ }
+ if h.Cancel != nil {
+ h.Cancel()
+ }
+ if h.PC != nil {
+ _ = h.PC.Close()
+ }
+ delete(r.SFUHosts, sharerID)
+}
+
+// closeAllSFUHosts tears down all active sharers' SFU state (used on room close).
+func (r *Room) closeAllSFUHosts() {
+ for id := range r.SFUHosts {
+ r.closeSFUHost(id)
+ }
+}
+
type RoomSession struct {
- Host xid.ID
- Client xid.ID
+ Host xid.ID
+ Client xid.ID
+ ViewerPC *webrtc.PeerConnection // SFU only: server-side sendonly PC for this viewer
}
func (r *Room) notifyInfoChanged() {
@@ -140,6 +192,16 @@ type User struct {
_write chan<- outgoing.Message
}
+// sessionByPeers finds the session ID and session for a given (host, client) pair.
+func (r *Room) sessionByPeers(host, client xid.ID) (xid.ID, *RoomSession, bool) {
+ for id, s := range r.Sessions {
+ if s.Host == host && s.Client == client {
+ return id, s, true
+ }
+ }
+ return xid.ID{}, nil, false
+}
+
func (u *User) WriteTimeout(msg outgoing.Message) {
writeTimeout(u._write, msg)
}
diff --git a/ws/rooms.go b/ws/rooms.go
index 8ebded32..60905c49 100644
--- a/ws/rooms.go
+++ b/ws/rooms.go
@@ -8,6 +8,8 @@ import (
"time"
"github.com/gorilla/websocket"
+ "github.com/pion/interceptor"
+ "github.com/pion/webrtc/v4"
"github.com/rs/xid"
"github.com/rs/zerolog/log"
"github.com/screego/server/auth"
@@ -16,7 +18,19 @@ import (
"github.com/screego/server/util"
)
+func newWebRTCAPI() *webrtc.API {
+ m := &webrtc.MediaEngine{}
+ _ = m.RegisterDefaultCodecs()
+ i := &interceptor.Registry{}
+ _ = webrtc.RegisterDefaultInterceptors(m, i)
+ return webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
+}
+
func NewRooms(tServer turn.Server, users *auth.Users, conf config.Config) *Rooms {
+ var api *webrtc.API
+ if conf.SFUMode {
+ api = newWebRTCAPI()
+ }
return &Rooms{
Rooms: map[string]*Room{},
Incoming: make(chan ClientMessage),
@@ -25,6 +39,7 @@ func NewRooms(tServer turn.Server, users *auth.Users, conf config.Config) *Rooms
users: users,
config: conf,
r: rand.New(rand.NewSource(time.Now().Unix())),
+ webrtcAPI: api,
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
@@ -45,6 +60,7 @@ func NewRooms(tServer turn.Server, users *auth.Users, conf config.Config) *Rooms
type Rooms struct {
turnServer turn.Server
+ webrtcAPI *webrtc.API
Rooms map[string]*Room
Incoming chan ClientMessage
upgrader websocket.Upgrader
@@ -136,6 +152,7 @@ func (r *Rooms) closeRoom(roomID string) {
for id := range room.Sessions {
room.closeSession(r, id)
}
+ room.closeAllSFUHosts()
delete(r.Rooms, roomID)
roomsClosedTotal.Inc()
diff --git a/ws/sfu.go b/ws/sfu.go
new file mode 100644
index 00000000..7a3a4b36
--- /dev/null
+++ b/ws/sfu.go
@@ -0,0 +1,276 @@
+package ws
+
+import (
+ "context"
+ "encoding/json"
+ "io"
+ "net"
+
+ "github.com/pion/rtcp"
+ "github.com/pion/webrtc/v4"
+ "github.com/rs/xid"
+ "github.com/rs/zerolog/log"
+ "github.com/screego/server/ws/outgoing"
+)
+
+// Compile-time checks: SFU internal events implement the Event interface.
+var _ Event = (*SFUHostTrack)(nil)
+var _ Event = (*SFUIceCandidate)(nil)
+var _ Event = (*SFUConnectionFailed)(nil)
+var _ Event = (*SFUSendPLI)(nil)
+
+// createHostPC creates a server-side recvonly PeerConnection for the sharing user.
+// Sends hostsession + hostoffer to the sharer browser. Pion callbacks post internal SFU
+// events into rooms.Incoming — they never touch Room state directly.
+func createHostPC(room *Room, rooms *Rooms, sharerID xid.ID, v4addr, v6addr net.IP) {
+ iceServers, pioncfg := buildICEServers(room, rooms, xid.New().String()+"host", v4addr, v6addr, room.Users[sharerID].Addr)
+ sid := xid.New()
+
+ h, ok := room.SFUHosts[sharerID]
+ if !ok {
+ return
+ }
+ h.SessionID = sid
+
+ pc, err := rooms.webrtcAPI.NewPeerConnection(pioncfg)
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: create host PC")
+ return
+ }
+ h.PC = pc
+
+ if _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
+ webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
+ log.Error().Err(err).Msg("SFU: add recvonly transceiver")
+ _ = pc.Close()
+ h.PC = nil
+ return
+ }
+
+ roomID := room.ID
+ pc.OnICECandidate(func(c *webrtc.ICECandidate) {
+ if c == nil {
+ return
+ }
+ rooms.Incoming <- ClientMessage{
+ SkipConnectedCheck: true,
+ Incoming: &SFUIceCandidate{
+ RoomID: roomID, SessionID: sid,
+ SharerID: sharerID, IsHost: true, Candidate: c,
+ },
+ }
+ })
+
+ pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
+ rooms.Incoming <- ClientMessage{
+ SkipConnectedCheck: true,
+ Incoming: &SFUHostTrack{RoomID: roomID, SharerID: sharerID, Track: track, Receiver: receiver},
+ }
+ })
+
+ pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
+ if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected {
+ rooms.Incoming <- ClientMessage{
+ SkipConnectedCheck: true,
+ Incoming: &SFUConnectionFailed{RoomID: roomID, SharerID: sharerID, IsHost: true},
+ }
+ }
+ })
+
+ offer, err := pc.CreateOffer(nil)
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: create host offer")
+ _ = pc.Close()
+ h.PC = nil
+ return
+ }
+ if err = pc.SetLocalDescription(offer); err != nil {
+ log.Error().Err(err).Msg("SFU: set host local description")
+ _ = pc.Close()
+ h.PC = nil
+ return
+ }
+
+ sdpJSON, err := json.Marshal(offer)
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: marshal host offer")
+ _ = pc.Close()
+ h.PC = nil
+ return
+ }
+
+ sharer := room.Users[sharerID]
+ sharer.WriteTimeout(outgoing.HostSession{ID: sid, Peer: sharerID, ICEServers: iceServers, Mode: "sfu"})
+ sharer.WriteTimeout(outgoing.HostOffer{SID: sid, Value: sdpJSON})
+}
+
+// createViewerPC creates a server-side sendonly PeerConnection for a viewer and sends
+// clientsession + hostoffer to the viewer browser.
+func createViewerPC(room *Room, rooms *Rooms, sharerID, viewerID xid.ID, v4addr, v6addr net.IP) {
+ iceServers, pioncfg := buildICEServers(room, rooms, xid.New().String()+"client", v4addr, v6addr, room.Users[viewerID].Addr)
+ sid := xid.New()
+
+ pc, err := rooms.webrtcAPI.NewPeerConnection(pioncfg)
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: create viewer PC")
+ return
+ }
+
+ roomID := room.ID
+
+ h := room.SFUHosts[sharerID]
+ if h != nil && h.Track != nil {
+ sender, err := pc.AddTrack(h.Track)
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: add shared track to viewer PC")
+ _ = pc.Close()
+ return
+ }
+ // Forward PLI from the viewer browser back to the sharer so it re-sends a keyframe.
+ go func() {
+ rtcpBuf := make([]byte, 1500)
+ for {
+ n, _, err := sender.Read(rtcpBuf)
+ if err != nil {
+ return
+ }
+ pkts, err := rtcp.Unmarshal(rtcpBuf[:n])
+ if err != nil {
+ continue
+ }
+ for _, pkt := range pkts {
+ if _, ok := pkt.(*rtcp.PictureLossIndication); ok {
+ rooms.Incoming <- ClientMessage{
+ SkipConnectedCheck: true,
+ Incoming: &SFUSendPLI{RoomID: roomID, SharerID: sharerID},
+ }
+ }
+ }
+ }
+ }()
+ }
+
+ room.Sessions[sid] = &RoomSession{Host: sharerID, Client: viewerID, ViewerPC: pc}
+ sessionCreatedTotal.Inc()
+
+ pc.OnICECandidate(func(c *webrtc.ICECandidate) {
+ if c == nil {
+ return
+ }
+ rooms.Incoming <- ClientMessage{
+ SkipConnectedCheck: true,
+ Incoming: &SFUIceCandidate{
+ RoomID: roomID, SessionID: sid,
+ SharerID: sharerID, IsHost: false, Candidate: c,
+ },
+ }
+ })
+
+ pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
+ if state == webrtc.PeerConnectionStateConnected {
+ // New viewer connected — ask sharer for a keyframe so the viewer can start decoding.
+ rooms.Incoming <- ClientMessage{
+ SkipConnectedCheck: true,
+ Incoming: &SFUSendPLI{RoomID: roomID, SharerID: sharerID},
+ }
+ }
+ if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected {
+ rooms.Incoming <- ClientMessage{
+ SkipConnectedCheck: true,
+ Incoming: &SFUConnectionFailed{RoomID: roomID, SharerID: sharerID, IsHost: false, SessionID: sid},
+ }
+ }
+ })
+
+ offer, err := pc.CreateOffer(nil)
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: create viewer offer")
+ _ = pc.Close()
+ delete(room.Sessions, sid)
+ return
+ }
+ if err = pc.SetLocalDescription(offer); err != nil {
+ log.Error().Err(err).Msg("SFU: set viewer local description")
+ _ = pc.Close()
+ delete(room.Sessions, sid)
+ return
+ }
+
+ sdpJSON, err := json.Marshal(offer)
+ if err != nil {
+ log.Error().Err(err).Msg("SFU: marshal viewer offer")
+ _ = pc.Close()
+ delete(room.Sessions, sid)
+ return
+ }
+
+ viewer := room.Users[viewerID]
+ viewer.WriteTimeout(outgoing.ClientSession{ID: sid, Peer: sharerID, ICEServers: iceServers})
+ viewer.WriteTimeout(outgoing.HostOffer{SID: sid, Value: sdpJSON})
+}
+
+// startRTPForward pumps RTP from the sharer's track to all viewers via the shared track
+// and drains RTCP from the receiver so pion's internal buffers don't stall.
+// PLI forwarding (viewer→sharer) is handled separately in createViewerPC via sender.Read().
+func startRTPForward(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver,
+ shared *webrtc.TrackLocalStaticRTP) {
+
+ go func() {
+ buf := make([]byte, 1500)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ n, _, err := track.Read(buf)
+ if err != nil {
+ return
+ }
+ if _, err = shared.Write(buf[:n]); err != nil && err != io.ErrClosedPipe {
+ return
+ }
+ }
+ }()
+
+ // Drain RTCP from the sharer's receiver so pion's internal buffers don't stall.
+ go func() {
+ rtcpBuf := make([]byte, 1500)
+ for {
+ if _, _, err := receiver.Read(rtcpBuf); err != nil {
+ return
+ }
+ }
+ }()
+}
+
+// buildICEServers returns:
+// - outgoing ICE servers to send to the browser (STUN or TURN with credentials)
+// - a pion Configuration for the server-side PeerConnection
+//
+// Pion is the server endpoint — it never needs TURN to reach itself.
+// In STUN mode pion gets the same STUN URL so it can discover its reflexive address.
+// In TURN mode pion gets only STUN (derived from the TURN address) so it discovers
+// its public IP without allocating a relay it will never use.
+func buildICEServers(room *Room, rooms *Rooms, credKey string, v4addr, v6addr, clientAddr net.IP) ([]outgoing.ICEServer, webrtc.Configuration) {
+ var out []outgoing.ICEServer
+ var pionServers []webrtc.ICEServer
+
+ switch room.Mode {
+ case ConnectionSTUN:
+ urls := rooms.addresses("stun", v4addr, v6addr, false)
+ out = []outgoing.ICEServer{{URLs: urls}}
+ pionServers = []webrtc.ICEServer{{URLs: urls}}
+ case ConnectionTURN:
+ name, pw := rooms.turnServer.Credentials(credKey, clientAddr)
+ turnURLs := rooms.addresses("turn", v4addr, v6addr, true)
+ out = []outgoing.ICEServer{{URLs: turnURLs, Username: name, Credential: pw}}
+ // Pion only needs STUN to discover its reflexive address; not TURN.
+ stunURLs := rooms.addresses("stun", v4addr, v6addr, false)
+ if len(stunURLs) > 0 {
+ pionServers = []webrtc.ICEServer{{URLs: stunURLs}}
+ }
+ }
+
+ return out, webrtc.Configuration{ICEServers: pionServers}
+}
diff --git a/ws/sfu_test.go b/ws/sfu_test.go
new file mode 100644
index 00000000..1e48a2d4
--- /dev/null
+++ b/ws/sfu_test.go
@@ -0,0 +1,311 @@
+package ws
+
+import (
+ "context"
+ "net"
+ "testing"
+
+ "github.com/rs/xid"
+ "github.com/screego/server/config"
+ "github.com/screego/server/config/ipdns"
+ "github.com/screego/server/ws/outgoing"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// --- test helpers ---
+
+type mockTurnServer struct {
+ disallowed []string
+}
+
+func (m *mockTurnServer) Credentials(id string, addr net.IP) (string, string) {
+ return "testuser-" + id, "testpass"
+}
+func (m *mockTurnServer) Disallow(username string) {
+ m.disallowed = append(m.disallowed, username)
+}
+
+type staticIPProvider struct{ v4, v6 net.IP }
+
+func (s staticIPProvider) Get() (net.IP, net.IP, error) { return s.v4, s.v6, nil }
+
+var _ ipdns.Provider = staticIPProvider{}
+
+func makeSFURooms() *Rooms {
+ ts := &mockTurnServer{}
+ return &Rooms{
+ Rooms: map[string]*Room{},
+ Incoming: make(chan ClientMessage, 20),
+ connected: map[xid.ID]string{},
+ turnServer: ts,
+ config: config.Config{
+ SFUMode: true,
+ TurnPort: "3478",
+ TurnIPProvider: staticIPProvider{
+ v4: net.ParseIP("127.0.0.1"),
+ },
+ },
+ }
+}
+
+func makeTestRoom(id string, mode ConnectionMode) *Room {
+ return &Room{
+ ID: id,
+ Mode: mode,
+ Users: map[xid.ID]*User{},
+ Sessions: map[xid.ID]*RoomSession{},
+ }
+}
+
+func makeTestUser(streaming bool) (*User, chan outgoing.Message) {
+ ch := make(chan outgoing.Message, 10)
+ u := &User{
+ ID: xid.New(),
+ Streaming: streaming,
+ _write: ch,
+ }
+ return u, ch
+}
+
+// --- closeSFUHost ---
+
+func TestCloseSFUHost_NilSafe(t *testing.T) {
+ room := makeTestRoom("r", ConnectionLocal)
+ // No SFUHosts at all — must not panic.
+ assert.NotPanics(t, func() { room.closeSFUHost(xid.New()) })
+ assert.NotPanics(t, room.closeAllSFUHosts)
+ assert.Empty(t, room.SFUHosts)
+}
+
+func TestCloseSFUHost_CallsCancel(t *testing.T) {
+ room := makeTestRoom("r", ConnectionLocal)
+ sharerID := xid.New()
+ cancelled := false
+ ctx, cancel := context.WithCancel(context.Background())
+ room.SFUHosts = map[xid.ID]*SFUHost{
+ sharerID: {
+ Cancel: func() {
+ cancelled = true
+ cancel()
+ },
+ },
+ }
+ room.closeSFUHost(sharerID)
+ assert.True(t, cancelled)
+ assert.NotContains(t, room.SFUHosts, sharerID)
+ assert.Equal(t, context.Canceled, ctx.Err())
+}
+
+func TestCloseSFUHost_ClearsPending(t *testing.T) {
+ room := makeTestRoom("r", ConnectionLocal)
+ sharerID := xid.New()
+ room.SFUHosts = map[xid.ID]*SFUHost{
+ sharerID: {Pending: []xid.ID{xid.New(), xid.New()}},
+ }
+ room.closeSFUHost(sharerID)
+ assert.NotContains(t, room.SFUHosts, sharerID)
+}
+
+// --- SFUConnectionFailed ---
+
+func TestSFUConnectionFailed_UnknownRoom(t *testing.T) {
+ rooms := makeSFURooms()
+ ev := &SFUConnectionFailed{RoomID: "no-such-room", IsHost: true}
+ // Must not panic.
+ require.NoError(t, ev.Execute(rooms, ClientInfo{}))
+}
+
+func TestSFUConnectionFailed_Host_MarksUsersNotStreaming(t *testing.T) {
+ rooms := makeSFURooms()
+ room := makeTestRoom("r", ConnectionLocal)
+ rooms.Rooms["r"] = room
+
+ host, _ := makeTestUser(true)
+ viewer, viewerCh := makeTestUser(false)
+ room.Users[host.ID] = host
+ room.Users[viewer.ID] = viewer
+
+ sid := xid.New()
+ room.Sessions[sid] = &RoomSession{Host: host.ID, Client: viewer.ID}
+ room.SFUHosts = map[xid.ID]*SFUHost{host.ID: {}}
+
+ ev := &SFUConnectionFailed{RoomID: "r", SharerID: host.ID, IsHost: true}
+ require.NoError(t, ev.Execute(rooms, ClientInfo{}))
+
+ assert.False(t, host.Streaming, "host should be marked not streaming")
+ assert.Empty(t, room.Sessions, "all sessions should be removed")
+ assert.NotContains(t, room.SFUHosts, host.ID)
+
+ var msg outgoing.Message
+ select {
+ case msg = <-viewerCh:
+ default:
+ t.Fatal("viewer should have received endshare")
+ }
+ _, ok := msg.(outgoing.EndShare)
+ assert.True(t, ok, "viewer should receive EndShare message")
+}
+
+func TestSFUConnectionFailed_Viewer_ClosesOneSession(t *testing.T) {
+ rooms := makeSFURooms()
+ room := makeTestRoom("r", ConnectionLocal)
+ rooms.Rooms["r"] = room
+
+ host, _ := makeTestUser(true)
+ viewer1, viewer1Ch := makeTestUser(false)
+ viewer2, viewer2Ch := makeTestUser(false)
+ room.Users[host.ID] = host
+ room.Users[viewer1.ID] = viewer1
+ room.Users[viewer2.ID] = viewer2
+
+ sid1 := xid.New()
+ sid2 := xid.New()
+ room.Sessions[sid1] = &RoomSession{Host: host.ID, Client: viewer1.ID}
+ room.Sessions[sid2] = &RoomSession{Host: host.ID, Client: viewer2.ID}
+
+ ev := &SFUConnectionFailed{RoomID: "r", SharerID: host.ID, IsHost: false, SessionID: sid1}
+ require.NoError(t, ev.Execute(rooms, ClientInfo{}))
+
+ assert.NotContains(t, room.Sessions, sid1, "sid1 should be removed")
+ assert.Contains(t, room.Sessions, sid2, "sid2 should remain")
+
+ select {
+ case <-viewer1Ch:
+ // expected endshare
+ default:
+ t.Fatal("viewer1 should receive endshare")
+ }
+
+ select {
+ case <-viewer2Ch:
+ t.Fatal("viewer2 should not receive anything")
+ default:
+ // expected
+ }
+}
+
+func TestSFUConnectionFailed_Viewer_UnknownSession(t *testing.T) {
+ rooms := makeSFURooms()
+ room := makeTestRoom("r", ConnectionLocal)
+ rooms.Rooms["r"] = room
+
+ ev := &SFUConnectionFailed{RoomID: "r", IsHost: false, SessionID: xid.New()}
+ // Must not panic on unknown session.
+ require.NoError(t, ev.Execute(rooms, ClientInfo{}))
+}
+
+// --- buildICEServers ---
+
+func TestBuildICEServers_Local(t *testing.T) {
+ rooms := makeSFURooms()
+ room := makeTestRoom("r", ConnectionLocal)
+ v4 := net.ParseIP("1.2.3.4")
+
+ out, cfg := buildICEServers(room, rooms, "k", v4, nil, nil)
+ assert.Empty(t, out)
+ assert.Empty(t, cfg.ICEServers)
+}
+
+func TestBuildICEServers_STUN(t *testing.T) {
+ rooms := makeSFURooms()
+ room := makeTestRoom("r", ConnectionSTUN)
+ v4 := net.ParseIP("1.2.3.4")
+
+ out, cfg := buildICEServers(room, rooms, "k", v4, nil, nil)
+ require.Len(t, out, 1)
+ require.Len(t, cfg.ICEServers, 1)
+ assert.Equal(t, out[0].URLs, cfg.ICEServers[0].URLs)
+ assert.Contains(t, out[0].URLs[0], "stun:")
+ assert.Contains(t, out[0].URLs[0], "1.2.3.4")
+ assert.Contains(t, out[0].URLs[0], "3478")
+}
+
+func TestBuildICEServers_TURN_HasCredentials(t *testing.T) {
+ rooms := makeSFURooms()
+ room := makeTestRoom("r", ConnectionTURN)
+ v4 := net.ParseIP("1.2.3.4")
+ clientAddr := net.ParseIP("9.9.9.9")
+
+ out, cfg := buildICEServers(room, rooms, "mykey", v4, nil, clientAddr)
+
+ // Browser gets TURN credentials.
+ require.Len(t, out, 1)
+ assert.NotEmpty(t, out[0].Username)
+ assert.NotEmpty(t, out[0].Credential)
+ assert.Contains(t, out[0].URLs[0], "turn:")
+
+ // Pion gets STUN only — it is the server endpoint, never needs its own relay.
+ require.Len(t, cfg.ICEServers, 1)
+ assert.Contains(t, cfg.ICEServers[0].URLs[0], "stun:")
+ assert.Empty(t, cfg.ICEServers[0].Username, "pion should not get TURN credentials")
+}
+
+func TestBuildICEServers_STUN_WithV6(t *testing.T) {
+ rooms := makeSFURooms()
+ room := makeTestRoom("r", ConnectionSTUN)
+ v4 := net.ParseIP("1.2.3.4")
+ v6 := net.ParseIP("2001:db8::1")
+
+ out, _ := buildICEServers(room, rooms, "k", v4, v6, nil)
+ require.Len(t, out, 1)
+ // Both v4 and v6 URLs in the same server entry.
+ assert.Len(t, out[0].URLs, 2)
+}
+
+// --- SFUMode=false leaves P2P path unchanged ---
+
+func TestShareEvent_SFUMode_QueuesExistingViewers(t *testing.T) {
+ rooms := makeSFURooms()
+ rooms.webrtcAPI = newWebRTCAPI()
+ room := makeTestRoom("r", ConnectionLocal)
+ rooms.Rooms["r"] = room
+
+ host, _ := makeTestUser(false)
+ viewer1, _ := makeTestUser(false)
+ viewer2, _ := makeTestUser(false)
+ room.Users[host.ID] = host
+ room.Users[viewer1.ID] = viewer1
+ room.Users[viewer2.ID] = viewer2
+ rooms.connected[host.ID] = "r"
+ rooms.connected[viewer1.ID] = "r"
+ rooms.connected[viewer2.ID] = "r"
+
+ ev := &StartShare{}
+ require.NoError(t, ev.Execute(rooms, ClientInfo{ID: host.ID}))
+
+ // Both viewers should be queued in the sharer's SFUHost entry.
+ require.Contains(t, room.SFUHosts, host.ID)
+ pending := room.SFUHosts[host.ID].Pending
+ assert.Len(t, pending, 2)
+ assert.Contains(t, pending, viewer1.ID)
+ assert.Contains(t, pending, viewer2.ID)
+ assert.NotContains(t, pending, host.ID)
+}
+
+func TestShareEvent_P2PMode_UsesNewSession(t *testing.T) {
+ rooms := makeSFURooms()
+ rooms.config.SFUMode = false
+ rooms.webrtcAPI = nil // must stay nil in P2P mode
+
+ room := makeTestRoom("r", ConnectionLocal)
+ rooms.Rooms["r"] = room
+ rooms.connected[xid.New()] = "r" // host connected
+
+ host, _ := makeTestUser(true)
+ viewer, _ := makeTestUser(false)
+ hostID := host.ID
+ room.Users[hostID] = host
+ room.Users[viewer.ID] = viewer
+ rooms.connected[hostID] = "r"
+ rooms.connected[viewer.ID] = "r"
+
+ host.Streaming = false
+ ev := &StartShare{}
+ require.NoError(t, ev.Execute(rooms, ClientInfo{ID: hostID}))
+
+ // P2P: one session created per viewer.
+ assert.Len(t, room.Sessions, 1)
+ // SFU fields must remain untouched.
+ assert.Empty(t, room.SFUHosts)
+}