@@ -2,24 +2,26 @@ package main
22
33import (
44 "encoding/json"
5- "fmt"
65 "log"
76 "net/http"
87 nurl "net/url"
98 "os"
109 "strconv"
1110 "strings"
12- // "reflect"
1311
14- "github.com/codegangsta/martini"
1512 "github.com/donovanhide/eventsource"
1613 "github.com/monnand/goredis"
1714)
1815
16+ const (
17+ // ssePath is the PATH for the eventsource handler is going to be mounted on
18+ ssePath = "/push/"
19+ )
20+
1921// Message is the bit of information that is transfered via eventsource
2022type Message struct {
2123 Idx string
22- Channel , Html string
24+ Channel , HTML string
2325}
2426
2527// Id is required to implement the eventsource.Event interface
@@ -30,7 +32,7 @@ func (c *Message) Event() string { return c.Channel }
3032
3133// Data is required to implement the eventsource.Event interface
3234func (c * Message ) Data () string {
33- return c .Html
35+ return c .HTML
3436}
3537
3638// Connection is use to relate a user token to a channel
@@ -56,7 +58,7 @@ func (h *Hub) userExists(token string) bool {
5658}
5759
5860func (h * Hub ) run () {
59- fmt .Println ("Start the Hub" )
61+ log .Println ("[Info] Start the Hub" )
6062 var payload [3 ]string
6163 psub := make (chan string , 0 )
6264 go h .client .Subscribe (nil , nil , psub , nil , h .messages )
@@ -67,15 +69,12 @@ func (h *Hub) run() {
6769 for {
6870 select {
6971 case conn := <- h .register :
70- fmt .Println ("register user: " , conn .token )
71- // TODO try to get the channel
72+ log .Println ("[Info] register user: " , conn .token )
7273 h .Users [conn .token ] = conn .channel
73- //fmt.Println("[DEBUG] After h.Users assignment", h.Users[conn.token])
7474 h .Data [conn .channel ] = append (h .Data [conn .channel ], conn .token )
75- //fmt.Println("[DEBUG] After h.Data assignment", h.Data[conn.channel])
7675
7776 case token := <- h .unregister :
78- fmt .Println ("unregister user: " , token )
77+ log .Println ("[Info] Unregister user: " , token )
7978 ch , ok := h .Users [token ]
8079 if ok {
8180 delete (h .Users , token )
@@ -85,16 +84,16 @@ func (h *Hub) run() {
8584 case msg := <- h .messages :
8685 err := json .Unmarshal (msg .Message , & payload )
8786 if err != nil {
88- fmt .Println ("[Error] An error occured while Unmarshalling the msg: " , msg )
87+ log .Println ("[Error] An error occured while Unmarshalling the msg: " , msg )
8988 }
9089 message := & Message {
9190 Idx : payload [2 ],
9291 Channel : payload [0 ],
93- Html : payload [1 ],
92+ HTML : payload [1 ],
9493 }
9594 val , ok := h .Data [msg .Channel ]
9695 if ok && len (val ) >= 1 {
97- fmt .Println ("[DEBUG ] msg sent to tokens" , val )
96+ log .Println ("[Info ] msg sent to tokens" , val )
9897 h .srv .Publish (val , message )
9998 }
10099 }
@@ -103,18 +102,20 @@ func (h *Hub) run() {
103102
104103// NewHub a pointer to an initialized Hub
105104func NewHub () * Hub {
106- redisUrlString := os .Getenv ("REDIS_SSEQUEUE_URL" )
107- if redisUrlString == "" {
108- redisUrlString = "redis://localhost:6379/2"
105+ redisURLString := os .Getenv ("REDIS_SSEQUEUE_URL" )
106+ if redisURLString == "" {
107+ // Use db 2 by default for pub/sub
108+ redisURLString = "redis://localhost:6379/2"
109109 }
110- redisUrl , err := nurl .Parse (redisUrlString )
110+ log .Println ("[Info] Redis configuration used for pub/sub" , redisURLString )
111+ redisURL , err := nurl .Parse (redisURLString )
111112 if err != nil {
112113 log .Fatal ("Could not read Redis string" , err )
113114 }
114115
115- redisDb , err := strconv .Atoi (strings .TrimLeft (redisUrl .Path , "/" ))
116+ redisDb , err := strconv .Atoi (strings .TrimLeft (redisURL .Path , "/" ))
116117 if err != nil {
117- log .Fatal ("Could not read Redis path" , err )
118+ log .Fatal ("[Error] Could not read Redis path" , err )
118119 }
119120
120121 server := eventsource .NewServer ()
@@ -127,33 +128,38 @@ func NewHub() *Hub {
127128 unregister : make (chan string , 0 ),
128129 messages : make (chan goredis.Message , 0 ),
129130 srv : server ,
130- client : goredis.Client {Addr : redisUrl .Host , Db : redisDb },
131+ client : goredis.Client {Addr : redisURL .Host , Db : redisDb },
131132 }
132- // We use the second redis database for the pub/sub
133- //h.client.Db = 2
134133 return & h
135134}
136135
137136func main () {
137+ sseString := os .Getenv ("SSE_HOST" )
138+ if sseString == "" {
139+ log .Fatal ("SSE_HOST is not set, example: SSE_HOST=localhost:3000" )
140+ }
141+ log .Println ("[Info] botbot-eventsource is listening on " + sseString )
142+
143+ log .Println ("[Info] Starting the eventsource Hub" )
138144 h := NewHub ()
139145 go h .run ()
140146
141- m := martini .Classic ()
142-
143147 // eventsource endpoints
144- m . Get ( "/push/:token" , func (w http.ResponseWriter , req * http.Request , params martini. Params ) {
145- token := params [ "token" ]
148+ http . HandleFunc ( ssePath , func (w http.ResponseWriter , req * http.Request ) {
149+ token := req . URL . Path [ len ( ssePath ): ]
146150
147151 if h .userExists (token ) {
148- // TODO proper response
149- fmt . Fprintf (w , "Not allowed -- User already connected" )
152+ log . Println ( "[Info] Forbiden, user already connected" )
153+ http . Error (w , "Forbiden" , http . StatusForbidden )
150154 } else {
151- fmt .Println ("Exchange token against the channel" )
152- ch , err := h .client .Getset (token , []byte {})
155+ log .Println ("[Info] Exchange token against the channel list " )
156+ val , err := h .client .Getset (token , []byte {})
153157 if err != nil {
154- fmt .Fprintf (w , "Not allowed -- Error occured while exchanging the token" )
155- } else {
156- h .register <- Connection {token , string (ch )}
158+ log .Println ("[Error] occured while exchanging the your security token." )
159+ http .Error (w , "Error occured while exchanging the your security token" , http .StatusUnauthorized )
160+ } else if chanName := string (val ); chanName != "" {
161+ log .Println ("[Info] Connecting" , token , "to the channel" , chanName )
162+ h .register <- Connection {token , chanName }
157163 defer func (u string ) {
158164 h .unregister <- u
159165 }(token )
@@ -162,11 +168,5 @@ func main() {
162168 }
163169 })
164170
165- sseString := os .Getenv ("SSE_HOST" )
166- if sseString == "" {
167- log .Fatal ("SSE_HOST is not set, example: SSE_HOST=localhost:3000" )
168- }
169-
170- log .Println ("listening on " + sseString )
171- log .Fatalln (http .ListenAndServe (sseString , m ))
171+ log .Fatalln (http .ListenAndServe (sseString , nil ))
172172}
0 commit comments