@@ -110,7 +110,35 @@ func (h *Hub) run() {
110110 }
111111}
112112
113- // NewHub a pointer to an initialized Hub
113+ // EventSourceHandler implements the Handler interface
114+ func (h * Hub ) EventSourceHandler (w http.ResponseWriter , req * http.Request ) {
115+ token := req .URL .Path [len (ssePath ):]
116+
117+ if h .userExists (token ) {
118+ log .Println ("[Info] Forbiden, user already connected" )
119+ http .Error (w , "Forbiden" , http .StatusForbidden )
120+ } else {
121+ log .Println ("[Info] Exchange token against the channel list" , token )
122+ val , err := h .client .Getset (token , []byte {})
123+ if err != nil {
124+ log .Println ("[Error] occured while exchanging the your security token." , token , ":" , err )
125+ http .Error (w , "Error occured while exchanging the your security token" , http .StatusUnauthorized )
126+ } else if chanName := string (val ); chanName != "" {
127+ log .Println ("[Info] Connecting" , token , "to the channel" , chanName )
128+ h .register <- Connection {token , chanName }
129+ defer func (u string ) {
130+ h .unregister <- u
131+ }(token )
132+ h .srv .Handler (token )(w , req )
133+ }
134+ _ , err = h .client .Del (token )
135+ if err != nil {
136+ log .Println ("[Error] An error occured while trying to delete the token from redis" , err )
137+ }
138+ }
139+ }
140+
141+ // NewHub returns a pointer to a initialized and running Hub
114142func NewHub () * Hub {
115143 redisURLString := os .Getenv ("REDIS_SSEQUEUE_URL" )
116144 if redisURLString == "" {
@@ -140,6 +168,7 @@ func NewHub() *Hub {
140168 srv : server ,
141169 client : goredis.Client {Addr : redisURL .Host , Db : redisDb },
142170 }
171+ go h .run ()
143172 return & h
144173}
145174
@@ -152,31 +181,9 @@ func main() {
152181
153182 log .Println ("[Info] Starting the eventsource Hub" )
154183 h := NewHub ()
155- go h .run ()
156184
157185 // eventsource endpoints
158- http .HandleFunc (ssePath , func (w http.ResponseWriter , req * http.Request ) {
159- token := req .URL .Path [len (ssePath ):]
160-
161- if h .userExists (token ) {
162- log .Println ("[Info] Forbiden, user already connected" )
163- http .Error (w , "Forbiden" , http .StatusForbidden )
164- } else {
165- log .Println ("[Info] Exchange token against the channel list" )
166- val , err := h .client .Getset (token , []byte {})
167- if err != nil {
168- log .Println ("[Error] occured while exchanging the your security token." )
169- http .Error (w , "Error occured while exchanging the your security token" , http .StatusUnauthorized )
170- } else if chanName := string (val ); chanName != "" {
171- log .Println ("[Info] Connecting" , token , "to the channel" , chanName )
172- h .register <- Connection {token , chanName }
173- defer func (u string ) {
174- h .unregister <- u
175- }(token )
176- h .srv .Handler (token )(w , req )
177- }
178- }
179- })
186+ http .HandleFunc (ssePath , h .EventSourceHandler )
180187
181188 log .Fatalln (http .ListenAndServe (sseString , nil ))
182189}
0 commit comments