Skip to content

Commit e977d5b

Browse files
authored
ENH: Adds support for multiple listeners (#35)
1 parent 8437e05 commit e977d5b

11 files changed

Lines changed: 265 additions & 57 deletions

File tree

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ ENV CERTS="" \
2828
LISTENER_ADDRESS="" \
2929
MODE="default" \
3030
PROXY_INSTANCE_NAME="docker-flow" \
31+
RELOAD_ATTEMPTS="5" \
3132
RELOAD_INTERVAL="5000" REPEAT_RELOAD=false \
3233
RECONFIGURE_ATTEMPTS="20" \
3334
SEPARATOR="," \

Dockerfile.linux-arm

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ ENV CERTS="" \
1919
LISTENER_ADDRESS="" \
2020
MODE="default" \
2121
PROXY_INSTANCE_NAME="docker-flow" \
22+
RELOAD_ATTEMPTS="5" \
2223
RELOAD_INTERVAL="5000" REPEAT_RELOAD=false \
2324
RECONFIGURE_ATTEMPTS="20" \
2425
SEPARATOR="," \

actions/fetch.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package actions
22

33
import (
4-
"../proxy"
54
"encoding/json"
65
"fmt"
76
"net/http"
87
"strings"
8+
9+
"../proxy"
910
)
1011

1112
// Fetchable defines interface that fetches information from other sources
@@ -46,6 +47,7 @@ func (m *fetch) ReloadConfig(baseData BaseReconfigure, listenerAddr string) erro
4647
if err = json.NewDecoder(resp.Body).Decode(&services); err != nil {
4748
return err
4849
}
50+
4951
needsReload := false
5052
for _, s := range services {
5153
proxyService := proxy.GetServiceFromMap(&s)

args.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package main
22

33
import (
4-
"github.com/jessevdk/go-flags"
54
"os"
5+
6+
"github.com/jessevdk/go-flags"
67
)
78

89
type args struct{}

args_test.go

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ package main
44

55
import (
66
"fmt"
7-
"github.com/stretchr/testify/suite"
87
"net/http"
98
"os"
109
"testing"
10+
11+
"github.com/stretchr/testify/suite"
1112
)
1213

1314
type ArgsTestSuite struct {
@@ -49,6 +50,7 @@ func (s ArgsTestSuite) Test_Parse_ParsesServerLongArgs() {
4950
for _, d := range data {
5051
s.Equal(d.expected, *d.value)
5152
}
53+
s.Len(serverImpl.ListenerAddresses, 1)
5254
}
5355

5456
func (s ArgsTestSuite) Test_Parse_ParsesServerShortArgs() {
@@ -69,12 +71,11 @@ func (s ArgsTestSuite) Test_Parse_ParsesServerShortArgs() {
6971
for _, d := range data {
7072
s.Equal(d.expected, *d.value)
7173
}
74+
s.Len(serverImpl.ListenerAddresses, 1)
7275
}
7376

7477
func (s ArgsTestSuite) Test_Parse_ServerHasDefaultValues() {
7578
os.Args = []string{"myProgram", "server"}
76-
os.Unsetenv("IP")
77-
os.Unsetenv("PORT")
7879
data := []struct {
7980
expected string
8081
value *string
@@ -87,6 +88,7 @@ func (s ArgsTestSuite) Test_Parse_ServerHasDefaultValues() {
8788
for _, d := range data {
8889
s.Equal(d.expected, *d.value)
8990
}
91+
s.Len(serverImpl.ListenerAddresses, 1)
9092
}
9193

9294
func (s ArgsTestSuite) Test_Parse_ServerDefaultsToEnvVars() {
@@ -103,10 +105,74 @@ func (s ArgsTestSuite) Test_Parse_ServerDefaultsToEnvVars() {
103105
for _, d := range data {
104106
os.Setenv(d.key, d.expected)
105107
}
108+
defer func() {
109+
for _, d := range data {
110+
os.Unsetenv(d.key)
111+
}
112+
}()
113+
106114
args{}.parse()
107115
for _, d := range data {
108116
s.Equal(d.expected, *d.value)
109117
}
118+
119+
s.Len(serverImpl.ListenerAddresses, 1)
120+
}
121+
122+
func (s ArgsTestSuite) Test_Parse_ParsesListnerAddressShortArgs() {
123+
dataTable := []struct {
124+
value []string
125+
expected []string
126+
}{
127+
{[]string{"-l", "dfsl1", "-l", "dfsl2"}, []string{"dfsl1", "dfsl2"}},
128+
{[]string{"-l", "dfsl1"}, []string{"dfsl1"}},
129+
}
130+
131+
rootArgs := []string{"myProgram", "server"}
132+
for _, data := range dataTable {
133+
os.Args = append(rootArgs, data.value...)
134+
args{}.parse()
135+
s.Require().Equal(data.expected, serverImpl.ListenerAddresses)
136+
}
137+
}
138+
139+
func (s ArgsTestSuite) Test_Parse_ParsesListnerAddressLongArgs() {
140+
dataTable := []struct {
141+
value []string
142+
expected []string
143+
}{
144+
{[]string{"--listener-address", "dfsl1", "--listener-address", "dfsl2"}, []string{"dfsl1", "dfsl2"}},
145+
{[]string{"--listener-address", "dfsl1"}, []string{"dfsl1"}},
146+
}
147+
148+
rootArgs := []string{"myProgram", "server"}
149+
for _, data := range dataTable {
150+
os.Args = append(rootArgs, data.value...)
151+
args{}.parse()
152+
s.Require().Equal(data.expected, serverImpl.ListenerAddresses)
153+
}
154+
}
155+
156+
func (s ArgsTestSuite) Test_Parse_ParsesListnerAddressEnvVars() {
157+
os.Args = []string{"myProgram", "server"}
158+
dataTable := []struct {
159+
value string
160+
expected []string
161+
}{
162+
{"dfsl1,dfsl2", []string{"dfsl1", "dfsl2"}},
163+
{"dfsl1", []string{"dfsl1"}},
164+
}
165+
166+
defer func() {
167+
os.Unsetenv("LISTENER_ADDRESS")
168+
}()
169+
170+
for _, data := range dataTable {
171+
os.Setenv("LISTENER_ADDRESS", data.value)
172+
args{}.parse()
173+
s.Require().Equal(data.expected, serverImpl.ListenerAddresses)
174+
}
175+
110176
}
111177

112178
// Suite

docs/config.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ The following environment variables can be used to configure the *Docker Flow Pr
3131
|EXTRA_FRONTEND |Value will be added to the default `frontend` configuration. Multiple lines should be separated with comma (*,*).|
3232
|EXTRA_GLOBAL |Value will be added to the default `global` configuration. Multiple lines should be separated with comma (*,*).|
3333
|HTTPS_ONLY |If set to true, all requests to all services will be redirected to HTTPS.<br>**Example:** `true`<br>**Default Value:** `false`|
34-
|LISTENER_ADDRESS |The address of the [Docker Flow: Swarm Listener](https://github.com/docker-flow/docker-flow-swarm-listener) used for automatic proxy configuration.<br>**Example:** `swarm-listener:8080`|
34+
|LISTENER_ADDRESS |The address of the [Docker Flow: Swarm Listener](https://github.com/docker-flow/docker-flow-swarm-listener) used for automatic proxy configuration. Multiple values can be separated with comma (`,`). When set to multiple values, the proxy will query each address in order.<br>**Example:** `swarm-listener`|
3535
PROXY_INSTANCE_NAME|The name of the proxy instance. Useful if multiple proxies are running inside a cluster.<br>**Default value:** `docker-flow`|
3636
|RECONFIGURE_ATTEMPTS|The number of attempts the proxy will try to reconfigure itself before giving up and removing the offending service. The period between reconfigure attempts is 1 second.<br>**Example:** `15`<br>**Default value:** `20`|
37+
|RELOAD_ATTEMPTS |The number of attempts the proxy will query a listener addresss during startup. Only used when LISTENER_ADDRESS is a comma seperated list of addresses.<br>**Default value:** `5`|
3738
|RELOAD_INTERVAL |Defines the frequency (in milliseconds) between automatic config reloads from Swarm Listener.<br>**Default value:** `5000`|
3839
|REPEAT_RELOAD |If set to `true`, the proxy will periodically reload the config, using `RELOAD_INTERVAL` as pause between iterations.<br>**Example:** `true`<br>**Default value:** `false`|
3940
|RESOLVERS |The list of resolvers separated with comma (`,`). The `CHECK_RESOLVERS` environment variable must be set to `true`.<br>**Example:** `nameserver dns-0 4.4.2.1:53,nameserver dns-1 8.8.8.8:53`|

server.go

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,18 @@ type Server interface {
2323
}
2424

2525
type serve struct {
26-
IP string `short:"i" long:"ip" default:"0.0.0.0" env:"IP" description:"IP the server listens to."`
27-
ListenerAddress string `short:"l" long:"listener-address" env:"LISTENER_ADDRESS" description:"The address of the Docker Flow: Swarm Listener. The address matches the name of the Swarm service (e.g. swarm-listener)"`
28-
Port string `short:"p" long:"port" default:"8080" env:"PORT" description:"Port the server listens to."`
29-
ServiceName string `short:"n" long:"service-name" default:"proxy" env:"SERVICE_NAME" description:"The name of the proxy service. It is used only when running in 'swarm' mode and must match the '--name' parameter used to launch the service."`
26+
IP string `short:"i" long:"ip" default:"0.0.0.0" env:"IP" description:"IP the server listens to."`
27+
ListenerAddresses []string `short:"l" long:"listener-address" env:"LISTENER_ADDRESS" env-delim:"," description:"The address of the Docker Flow: Swarm Listener. The address matches the name of the Swarm service (e.g. swarm-listener)" default:""`
28+
Port string `short:"p" long:"port" default:"8080" env:"PORT" description:"Port the server listens to."`
29+
ServiceName string `short:"n" long:"service-name" default:"proxy" env:"SERVICE_NAME" description:"The name of the proxy service. It is used only when running in 'swarm' mode and must match the '--name' parameter used to launch the service."`
3030
SuccessfulInitReload bool
3131
// TODO: Remove
3232
actions.BaseReconfigure
3333
}
3434

35-
var serverImpl = serve{}
35+
var serverImpl = serve{
36+
ListenerAddresses: []string{},
37+
}
3638
var cert server.Certer = server.NewCert("/certs")
3739

3840
// Execute runs the Web server.
@@ -46,7 +48,7 @@ func (m *serve) Execute(args []string) error {
4648
address := fmt.Sprintf("%s:%s", m.IP, m.Port)
4749
cert.Init()
4850
var server2 = server.NewServer(
49-
m.ListenerAddress,
51+
m.ListenerAddresses,
5052
m.Port,
5153
m.ServiceName,
5254
m.ConfigsPath,
@@ -77,12 +79,10 @@ func (m *serve) Execute(args []string) error {
7779
}
7880

7981
func (m *serve) reconfigure(server server.Server) error {
80-
lAddr := ""
81-
if len(m.ListenerAddress) > 0 {
82-
lAddr = fmt.Sprintf("http://%s:8080", m.ListenerAddress)
83-
}
8482
fetch := actions.NewFetch(m.BaseReconfigure)
85-
if len(lAddr) > 0 {
83+
84+
if len(m.ListenerAddresses) == 1 && len(m.ListenerAddresses[0]) > 0 {
85+
lAddr := fmt.Sprintf("http://%s:8080", m.ListenerAddresses[0])
8686
go func() {
8787
retryInterval := os.Getenv("RELOAD_INTERVAL")
8888
interval, _ := time.ParseDuration(retryInterval + "ms")
@@ -105,6 +105,41 @@ func (m *serve) reconfigure(server server.Server) error {
105105
}()
106106
}
107107

108+
// Handlers Listener Addresses
109+
if len(m.ListenerAddresses) > 1 {
110+
reloadAttemptsStr := os.Getenv("RELOAD_ATTEMPTS")
111+
retryInterval := os.Getenv("RELOAD_INTERVAL")
112+
interval, _ := time.ParseDuration(retryInterval + "ms")
113+
for _, addr := range m.ListenerAddresses {
114+
if len(addr) == 0 {
115+
continue
116+
}
117+
lAddr := fmt.Sprintf("http://%s:8080", addr)
118+
go func(lAddr string) {
119+
reloadAttempts, err := strconv.ParseInt(reloadAttemptsStr, 10, 64)
120+
if err != nil {
121+
reloadAttempts = 5
122+
}
123+
for range time.Tick(interval) {
124+
if err := fetch.ReloadConfig(m.BaseReconfigure, lAddr); err != nil {
125+
logPrintf(
126+
"Error: Fetching config from swarm listener failed: %s. Will retry in %d seconds.",
127+
err.Error(),
128+
interval/time.Second,
129+
)
130+
} else {
131+
m.SuccessfulInitReload = true
132+
break
133+
}
134+
reloadAttempts = reloadAttempts - 1
135+
if reloadAttempts <= 0 {
136+
break
137+
}
138+
}
139+
}(lAddr)
140+
}
141+
}
142+
108143
services := server.GetServicesFromEnvVars()
109144

110145
for _, service := range *services {

server/cert_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package server
22

33
import (
4-
"../proxy"
54
"encoding/json"
65
"fmt"
7-
"github.com/stretchr/testify/mock"
8-
"github.com/stretchr/testify/suite"
96
"io/ioutil"
107
"net"
118
"net/http"
@@ -14,6 +11,10 @@ import (
1411
"path/filepath"
1512
"strings"
1613
"testing"
14+
15+
"../proxy"
16+
"github.com/stretchr/testify/mock"
17+
"github.com/stretchr/testify/suite"
1718
)
1819

1920
type CertTestSuite struct {

server/server.go

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,23 @@ const (
3030
)
3131

3232
type serve struct {
33-
listenerAddress string
34-
port string
35-
serviceName string
36-
configsPath string
37-
templatesPath string
38-
cert Certer
33+
listenerAddresses []string
34+
port string
35+
serviceName string
36+
configsPath string
37+
templatesPath string
38+
cert Certer
3939
}
4040

4141
// NewServer returns instance of the Server with populated data
42-
var NewServer = func(listenerAddr, port, serviceName, configsPath, templatesPath string, cert Certer) Server {
42+
var NewServer = func(listenerAddr []string, port, serviceName, configsPath, templatesPath string, cert Certer) Server {
4343
return &serve{
44-
listenerAddress: listenerAddr,
45-
port: port,
46-
serviceName: serviceName,
47-
configsPath: configsPath,
48-
templatesPath: templatesPath,
49-
cert: cert,
44+
listenerAddresses: listenerAddr,
45+
port: port,
46+
serviceName: serviceName,
47+
configsPath: configsPath,
48+
templatesPath: templatesPath,
49+
cert: cert,
5050
}
5151
}
5252

@@ -143,22 +143,29 @@ func (m *serve) ReloadHandler(w http.ResponseWriter, req *http.Request) {
143143
req.ParseForm()
144144
params := new(reloadParams)
145145
decoder.Decode(params, req.Form)
146-
listenerAddr := ""
147146
response := Response{
148147
Status: "OK",
149148
}
150-
if params.FromListener {
151-
listenerAddr = m.listenerAddress
152-
}
149+
153150
//MW: I've reconstructed original behavior. BUT.
154151
//shouldn't reload call ReloadServicesFromRegistry not just
155152
//reload in else, if so ReloadClusterConfig & ReloadServicesFromRegistry
156153
//could be enclosed in one method
157-
if len(listenerAddr) > 0 {
158-
fetch := actions.NewFetch(m.getBaseReconfigure())
159-
if err := fetch.ReloadClusterConfig(listenerAddr); err != nil {
160-
logPrintf("Error: ReloadClusterConfig failed: %s", err.Error())
161-
m.writeInternalServerError(w, &Response{}, err.Error())
154+
if params.FromListener {
155+
errs := []string{}
156+
for _, listenerAddr := range m.listenerAddresses {
157+
if len(listenerAddr) == 0 {
158+
continue
159+
}
160+
fetch := actions.NewFetch(m.getBaseReconfigure())
161+
if err := fetch.ReloadClusterConfig(listenerAddr); err != nil {
162+
errs = append(errs, err.Error())
163+
logPrintf("Error: ReloadClusterConfig failed: %s", err.Error())
164+
}
165+
}
166+
if len(errs) != 0 {
167+
errMsg := strings.Join(errs, " ,")
168+
m.writeInternalServerError(w, &Response{}, errMsg)
162169
} else {
163170
w.WriteHeader(http.StatusOK)
164171
}

0 commit comments

Comments
 (0)