Skip to content

Commit 04a15f0

Browse files
Merge pull request #45 from Azure/release/v1.2.0
Merge release/v1.2.0 to main 08/19
2 parents 64610d0 + dea7ddc commit 04a15f0

12 files changed

Lines changed: 1536 additions & 111 deletions

File tree

azureappconfiguration/azureappconfiguration.go

Lines changed: 230 additions & 65 deletions
Large diffs are not rendered by default.

azureappconfiguration/azureappconfiguration_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,11 +271,10 @@ func TestLoadKeyValues_InvalidJson(t *testing.T) {
271271
err := azappcfg.loadKeyValues(ctx, mockClient)
272272
assert.NoError(t, err)
273273

274-
assert.Len(t, azappcfg.keyValues, 1)
274+
assert.Len(t, azappcfg.keyValues, 2)
275275
assert.Equal(t, &value1, azappcfg.keyValues["key1"])
276-
// The invalid JSON key should be skipped
277-
_, exists := azappcfg.keyValues["key2"]
278-
assert.False(t, exists)
276+
// The invalid JSON key should be treated as a plain string
277+
assert.Equal(t, &value2, azappcfg.keyValues["key2"])
279278
}
280279

281280
func TestDeduplicateSelectors(t *testing.T) {

azureappconfiguration/client_manager.go

Lines changed: 251 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,16 @@
44
package azureappconfiguration
55

66
import (
7+
"context"
78
"fmt"
9+
"log"
10+
"math"
11+
"math/rand"
12+
"net"
13+
"net/url"
14+
"strconv"
815
"strings"
16+
"time"
917

1018
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
1119
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
@@ -14,24 +22,40 @@ import (
1422

1523
// configurationClientManager handles creation and management of app configuration clients
1624
type configurationClientManager struct {
17-
clientOptions *azappconfig.ClientOptions
18-
staticClient *configurationClientWrapper
19-
endpoint string
20-
credential azcore.TokenCredential
21-
secret string
22-
id string
25+
replicaDiscoveryEnabled bool
26+
clientOptions *azappconfig.ClientOptions
27+
staticClient *configurationClientWrapper
28+
dynamicClients []*configurationClientWrapper
29+
endpoint string
30+
validDomain string
31+
credential azcore.TokenCredential
32+
secret string
33+
id string
34+
lastFallbackClientAttempt time.Time
35+
lastFallbackClientRefresh time.Time
2336
}
2437

2538
// configurationClientWrapper wraps an Azure App Configuration client with additional metadata
2639
type configurationClientWrapper struct {
27-
endpoint string
28-
client *azappconfig.Client
40+
endpoint string
41+
client *azappconfig.Client
42+
backOffEndTime time.Time
43+
failedAttempts int
44+
}
45+
46+
type clientManager interface {
47+
getClients(ctx context.Context) ([]*configurationClientWrapper, error)
48+
refreshClients(ctx context.Context)
2949
}
3050

3151
// newConfigurationClientManager creates a new configuration client manager
32-
func newConfigurationClientManager(authOptions AuthenticationOptions, clientOptions *azappconfig.ClientOptions) (*configurationClientManager, error) {
52+
func newConfigurationClientManager(authOptions AuthenticationOptions, options *Options) (*configurationClientManager, error) {
3353
manager := &configurationClientManager{
34-
clientOptions: setTelemetry(clientOptions),
54+
clientOptions: setTelemetry(options.ClientOptions),
55+
}
56+
57+
if options.ReplicaDiscoveryEnabled == nil || *options.ReplicaDiscoveryEnabled {
58+
manager.replicaDiscoveryEnabled = true
3559
}
3660

3761
// Create client based on authentication options
@@ -75,7 +99,7 @@ func (manager *configurationClientManager) initializeClient(authOptions Authenti
7599
manager.credential = authOptions.Credential
76100
}
77101

78-
// Initialize the static client wrapper
102+
manager.validDomain = getValidDomain(manager.endpoint)
79103
manager.staticClient = &configurationClientWrapper{
80104
endpoint: manager.endpoint,
81105
client: staticClient,
@@ -84,6 +108,164 @@ func (manager *configurationClientManager) initializeClient(authOptions Authenti
84108
return nil
85109
}
86110

111+
func (manager *configurationClientManager) getClients(ctx context.Context) ([]*configurationClientWrapper, error) {
112+
currentTime := time.Now()
113+
clients := make([]*configurationClientWrapper, 0, 1+len(manager.dynamicClients))
114+
115+
// Add the static client if it is not in backoff
116+
if currentTime.After(manager.staticClient.backOffEndTime) {
117+
clients = append(clients, manager.staticClient)
118+
}
119+
120+
if !manager.replicaDiscoveryEnabled {
121+
return clients, nil
122+
}
123+
124+
if currentTime.After(manager.lastFallbackClientAttempt.Add(minimalClientRefreshInterval)) &&
125+
(manager.dynamicClients == nil ||
126+
currentTime.After(manager.lastFallbackClientRefresh.Add(fallbackClientRefreshExpireInterval))) {
127+
manager.lastFallbackClientAttempt = currentTime
128+
url, _ := url.Parse(manager.endpoint)
129+
manager.discoverFallbackClients(url.Host)
130+
}
131+
132+
for _, clientWrapper := range manager.dynamicClients {
133+
if currentTime.After(clientWrapper.backOffEndTime) {
134+
clients = append(clients, clientWrapper)
135+
}
136+
}
137+
138+
return clients, nil
139+
}
140+
141+
func (manager *configurationClientManager) refreshClients(ctx context.Context) {
142+
currentTime := time.Now()
143+
if manager.replicaDiscoveryEnabled &&
144+
currentTime.After(manager.lastFallbackClientAttempt.Add(minimalClientRefreshInterval)) {
145+
manager.lastFallbackClientAttempt = currentTime
146+
url, _ := url.Parse(manager.endpoint)
147+
manager.discoverFallbackClients(url.Host)
148+
}
149+
}
150+
151+
func (manager *configurationClientManager) discoverFallbackClients(host string) {
152+
go func() {
153+
defer func() {
154+
if r := recover(); r != nil {
155+
log.Printf("panic in replica discovery: %v", r)
156+
}
157+
}()
158+
159+
discoveryCtx, cancel := context.WithTimeout(context.Background(), failoverTimeout)
160+
defer cancel()
161+
162+
srvTargetHosts, err := querySrvTargetHost(discoveryCtx, host)
163+
if err != nil {
164+
log.Printf("failed to discover fallback clients for %s: %v", host, err)
165+
return
166+
}
167+
168+
manager.processSrvTargetHosts(srvTargetHosts)
169+
}()
170+
}
171+
172+
func (manager *configurationClientManager) processSrvTargetHosts(srvTargetHosts []string) {
173+
// Shuffle the list of SRV target hosts for load balancing
174+
rand.Shuffle(len(srvTargetHosts), func(i, j int) {
175+
srvTargetHosts[i], srvTargetHosts[j] = srvTargetHosts[j], srvTargetHosts[i]
176+
})
177+
178+
newDynamicClients := make([]*configurationClientWrapper, 0, len(srvTargetHosts))
179+
for _, host := range srvTargetHosts {
180+
if isValidEndpoint(host, manager.validDomain) {
181+
targetEndpoint := "https://" + host
182+
if strings.EqualFold(targetEndpoint, manager.endpoint) {
183+
continue // Skip primary endpoint
184+
}
185+
client, err := manager.newConfigurationClient(targetEndpoint)
186+
if err != nil {
187+
log.Printf("failed to create client for replica %s: %v", targetEndpoint, err)
188+
continue // Continue with other replicas instead of returning
189+
}
190+
newDynamicClients = append(newDynamicClients, &configurationClientWrapper{
191+
endpoint: targetEndpoint,
192+
client: client,
193+
})
194+
}
195+
}
196+
197+
manager.dynamicClients = newDynamicClients
198+
manager.lastFallbackClientRefresh = time.Now()
199+
}
200+
201+
func querySrvTargetHost(ctx context.Context, host string) ([]string, error) {
202+
results := make([]string, 0)
203+
204+
_, originRecords, err := net.DefaultResolver.LookupSRV(ctx, originKey, tcpKey, host)
205+
if err != nil {
206+
// If the host does not have SRV records => no replicas
207+
if dnsErr, ok := err.(*net.DNSError); ok && dnsErr.IsNotFound {
208+
return results, nil
209+
} else {
210+
return results, err
211+
}
212+
}
213+
214+
if len(originRecords) == 0 {
215+
return results, nil
216+
}
217+
218+
originHost := strings.TrimSuffix(originRecords[0].Target, ".")
219+
results = append(results, originHost)
220+
index := 0
221+
for {
222+
currentAlt := altKey + strconv.Itoa(index)
223+
_, altRecords, err := net.DefaultResolver.LookupSRV(ctx, currentAlt, tcpKey, originHost)
224+
if err != nil {
225+
// If the host does not have SRV records => no more replicas
226+
if dnsErr, ok := err.(*net.DNSError); ok && dnsErr.IsNotFound {
227+
break
228+
} else {
229+
return results, err
230+
}
231+
}
232+
233+
for _, record := range altRecords {
234+
altHost := strings.TrimSuffix(record.Target, ".")
235+
if altHost != "" {
236+
results = append(results, altHost)
237+
}
238+
}
239+
index = index + 1
240+
}
241+
242+
return results, nil
243+
}
244+
245+
func (manager *configurationClientManager) newConfigurationClient(endpoint string) (*azappconfig.Client, error) {
246+
if manager.credential != nil {
247+
return azappconfig.NewClient(endpoint, manager.credential, manager.clientOptions)
248+
}
249+
250+
connectionStr := buildConnectionString(endpoint, manager.secret, manager.id)
251+
if connectionStr == "" {
252+
return nil, fmt.Errorf("failed to build connection string for fallback client")
253+
}
254+
255+
return azappconfig.NewClientFromConnectionString(connectionStr, manager.clientOptions)
256+
}
257+
258+
func buildConnectionString(endpoint string, secret string, id string) string {
259+
if secret == "" || id == "" {
260+
return ""
261+
}
262+
263+
return fmt.Sprintf("%s=%s;%s=%s;%s=%s",
264+
endpointKey, endpoint,
265+
idKey, id,
266+
secretKey, secret)
267+
}
268+
87269
// parseConnectionString extracts a named value from a connection string
88270
func parseConnectionString(connectionString string, token string) (string, error) {
89271
if connectionString == "" {
@@ -125,3 +307,61 @@ func setTelemetry(options *azappconfig.ClientOptions) *azappconfig.ClientOptions
125307

126308
return options
127309
}
310+
311+
func getValidDomain(endpoint string) string {
312+
url, _ := url.Parse(endpoint)
313+
TrustedDomainLabels := []string{azConfigDomainLabel, appConfigDomainLabel}
314+
for _, label := range TrustedDomainLabels {
315+
index := strings.LastIndex(strings.ToLower(url.Host), strings.ToLower(label))
316+
if index != -1 {
317+
return url.Host[index:]
318+
}
319+
}
320+
321+
return ""
322+
}
323+
324+
func isValidEndpoint(host string, validDomain string) bool {
325+
if validDomain == "" {
326+
return false
327+
}
328+
329+
return strings.HasSuffix(strings.ToLower(host), strings.ToLower(validDomain))
330+
}
331+
332+
func (client *configurationClientWrapper) updateBackoffStatus(success bool) {
333+
if success {
334+
client.failedAttempts = 0
335+
client.backOffEndTime = time.Time{}
336+
} else {
337+
client.failedAttempts++
338+
client.backOffEndTime = time.Now().Add(client.getBackoffDuration())
339+
}
340+
}
341+
342+
func (client *configurationClientWrapper) getBackoffDuration() time.Duration {
343+
if client.failedAttempts <= 1 {
344+
return minBackoffDuration
345+
}
346+
347+
// Cap the exponent to prevent overflow
348+
exponent := math.Min(float64(client.failedAttempts-1), float64(safeShiftLimit))
349+
calculatedMilliseconds := float64(minBackoffDuration.Milliseconds()) * math.Pow(2, exponent)
350+
if calculatedMilliseconds > float64(maxBackoffDuration.Milliseconds()) || calculatedMilliseconds <= 0 {
351+
calculatedMilliseconds = float64(maxBackoffDuration.Milliseconds())
352+
}
353+
354+
calculatedDuration := time.Duration(calculatedMilliseconds) * time.Millisecond
355+
return jitter(calculatedDuration)
356+
}
357+
358+
func jitter(duration time.Duration) time.Duration {
359+
// Calculate the amount of jitter to add to the duration
360+
jitter := float64(duration) * jitterRatio
361+
362+
// Generate a random number between -jitter and +jitter
363+
randomJitter := rand.Float64()*(2*jitter) - jitter
364+
365+
// Apply the random jitter to the original duration
366+
return duration + time.Duration(randomJitter)
367+
}

azureappconfiguration/constants.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,19 @@ const (
5353
// minimalKeyVaultRefreshInterval is the minimum allowed refresh interval for Key Vault references
5454
minimalKeyVaultRefreshInterval time.Duration = 1 * time.Minute
5555
)
56+
57+
// Failover constants
58+
const (
59+
tcpKey string = "tcp"
60+
originKey string = "origin"
61+
altKey string = "alt"
62+
azConfigDomainLabel string = ".azconfig."
63+
appConfigDomainLabel string = ".appconfig."
64+
fallbackClientRefreshExpireInterval time.Duration = time.Hour
65+
minimalClientRefreshInterval time.Duration = time.Second * 30
66+
maxBackoffDuration time.Duration = time.Minute * 10
67+
minBackoffDuration time.Duration = time.Second * 30
68+
failoverTimeout time.Duration = time.Second * 10
69+
jitterRatio float64 = 0.25
70+
safeShiftLimit int = 63
71+
)

0 commit comments

Comments
 (0)