Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions api/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ func clusterManagerGet(state types.State, r *http.Request) types.Response {
return types.SmartError(err)
}

clusterManager, clusterManagerConfig, err := database.LoadClusterManager(state, r.Context(), name)
clusterManager, err := database.LoadClusterManager(state, r.Context(), name)
if err != nil {
return types.SmartError(err)
}

clusterManagerConfig, err := database.LoadClusterManagerConfigs(state, r.Context(), clusterManager.ID)
if err != nil {
return types.SmartError(err)
}
Expand Down Expand Up @@ -97,7 +102,7 @@ func clusterManagerPost(sh *service.Handler) func(state types.State, r *http.Req
}

// ensure cluster manager is not already configured
existingClusterManager, _, err := database.LoadClusterManager(state, r.Context(), args.Name)
existingClusterManager, err := database.LoadClusterManager(state, r.Context(), args.Name)
if err != nil {
if api.StatusErrorCheck(err, http.StatusNotFound) {
// ignore, this is the expected path
Expand Down Expand Up @@ -163,7 +168,7 @@ func clusterManagerPut(state types.State, r *http.Request) types.Response {
return types.SmartError(err)
}

clusterManager, _, err := database.LoadClusterManager(state, r.Context(), name)
clusterManager, err := database.LoadClusterManager(state, r.Context(), name)
if err != nil {
return types.SmartError(err)
}
Expand Down Expand Up @@ -191,8 +196,16 @@ func clusterManagerPut(state types.State, r *http.Request) types.Response {
}
}

if args.UpdateInterval != nil {
err = database.StoreClusterManagerConfig(state, r.Context(), name, database.UpdateIntervalSecondsKey, *args.UpdateInterval)
if args.UpdateIntervalSeconds != nil {
err = database.StoreClusterManagerConfig(state, r.Context(), name, database.UpdateIntervalSecondsKey, *args.UpdateIntervalSeconds)
if err != nil {
return types.SmartError(err)
}
}

if args.ReverseTunnel != nil {
reverseTunnelValue := strconv.FormatBool(*args.ReverseTunnel)
err = database.StoreClusterManagerConfig(state, r.Context(), name, database.ReverseTunnelKey, reverseTunnelValue)
if err != nil {
return types.SmartError(err)
}
Expand All @@ -215,7 +228,7 @@ func clusterManagerDelete(sh *service.Handler) func(state types.State, r *http.R
return types.SmartError(err)
}

clusterManager, _, err := database.LoadClusterManager(state, r.Context(), name)
clusterManager, err := database.LoadClusterManager(state, r.Context(), name)
if err != nil {
return types.SmartError(err)
}
Expand Down
25 changes: 24 additions & 1 deletion api/types/cluster_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
"net/http"
"time"
)

Expand Down Expand Up @@ -54,7 +55,11 @@ type ClusterManagerPut struct {

// Interval in seconds to send status messages to the cluster manager
// Example: 60
UpdateInterval *string `json:"update_interval" yaml:"update_interval"`
UpdateIntervalSeconds *string `json:"update_interval_seconds" yaml:"update_interval_seconds"`

// Enables or disables the reverse tunnel to the cluster manager
// Example: true, false
ReverseTunnel *bool `json:"reverse_tunnel" yaml:"reverse_tunnel"`
}

// StatusDistribution represents the distribution of items.
Expand Down Expand Up @@ -100,3 +105,21 @@ type ClusterManagerJoin struct {
ClusterCertificate string `json:"cluster_certificate" yaml:"cluster_certificate"`
Token string `json:"token" yaml:"token"`
}

// ClusterManagerTunnelRequest represents the request received through the tunnel.
type ClusterManagerTunnelRequest struct {
UUID string `json:"uuid"`
Method string `json:"method"`
Path string `json:"path"`
Headers http.Header `json:"headers"`
Body []byte `json:"body"`
}

// ClusterManagerTunnelResponse represents the response sent through the tunnel.
type ClusterManagerTunnelResponse struct {
UUID string `json:"uuid"`
Status int `json:"status"`
Headers http.Header `json:"headers"`
Cookies []*http.Cookie `json:"cookies"`
Body []byte `json:"body"`
}
40 changes: 35 additions & 5 deletions client/cluster_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/canonical/lxd/shared"
"github.com/canonical/lxd/shared/version"
"github.com/gorilla/websocket"

"github.com/canonical/microcloud/microcloud/api/types"
"github.com/canonical/microcloud/microcloud/database"
Expand Down Expand Up @@ -84,6 +86,26 @@ func (c *ClusterManagerClient) Delete(clusterCert *shared.CertInfo) error {
return err
}

// ConnectTunnel establishes a WebSocket connection to the cluster manager for reverse tunneling.
func (c *ClusterManagerClient) ConnectTunnel(ctx context.Context, clusterCert *shared.CertInfo) (*websocket.Conn, error) {
tlsConfig, address, err := c.getTlsConfig(clusterCert)
if err != nil {
return nil, fmt.Errorf("Failed to get TLS config: %w", err)
}

dialer := websocket.Dialer{
TLSClientConfig: tlsConfig,
}

u := url.URL{Scheme: "wss", Host: address, Path: "/1.0/remote-cluster/ws"}
conn, _, err := dialer.DialContext(ctx, u.String(), nil)
if err != nil {
return nil, err
}

return conn, nil
}

func (c *ClusterManagerClient) craftRequest(method string, path string, reqBody io.Reader) (*http.Request, error) {
url := "https://remote" + path // remote is a placeholder, real address will be set in sendRequest
req, err := http.NewRequest(method, url, reqBody)
Expand Down Expand Up @@ -122,7 +144,19 @@ func (c *ClusterManagerClient) sendRequest(clusterCert *shared.CertInfo, req *ht

func (c *ClusterManagerClient) getHTTPClient(clusterCert *shared.CertInfo) (*http.Client, string, error) {
client := &http.Client{}
tlsConfig, address, err := c.getTlsConfig(clusterCert)
if err != nil {
return nil, "", fmt.Errorf("Failed to get TLS config: %w", err)
}

client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}

return client, address, nil
}

func (c *ClusterManagerClient) getTlsConfig(clusterCert *shared.CertInfo) (*tls.Config, string, error) {
var address string
var remoteCert *x509.Certificate
var err error
Expand Down Expand Up @@ -171,9 +205,5 @@ func (c *ClusterManagerClient) getHTTPClient(clusterCert *shared.CertInfo) (*htt
return &cert, nil
}

client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}

return client, address, nil
return tlsConfig, address, nil
}
58 changes: 39 additions & 19 deletions cmd/microcloud/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/canonical/lxd/shared/api"
cli "github.com/canonical/lxd/shared/cmd"
"github.com/canonical/lxd/shared/validate"
"github.com/canonical/microcluster/v3/microcluster"
microTypes "github.com/canonical/microcluster/v3/microcluster/types"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -201,11 +202,11 @@ func (c *cmdClusterManagerGet) command() *cobra.Command {
cmd.Use = "get"
cmd.Short = "Get specific cluster manager configuration by key."
cmd.Example = cli.FormatSection("", `microcloud cluster-manager get addresses
microcloud cluster-manager get certificate-fingerprint
microcloud cluster-manager get update-interval-seconds
microcloud cluster-manager get status-last-success-time
microcloud cluster-manager get status-last-error-time
microcloud cluster-manager get status-last-error-response`)
microcloud cluster-manager get certificate_fingerprint
microcloud cluster-manager get update_interval_seconds
microcloud cluster-manager get status_last_success_time
microcloud cluster-manager get status_last_error_time
microcloud cluster-manager get status_last_error_response`)
Comment thread
edlerd marked this conversation as resolved.

cmd.RunE = c.run

Expand Down Expand Up @@ -233,20 +234,26 @@ func (c *cmdClusterManagerGet) run(_ *cobra.Command, args []string) error {
switch key {
case "addresses":
fmt.Printf("%s\n", strings.Join(clusterManager.Addresses, ", "))
case "certificate-fingerprint":
case "certificate_fingerprint":
fmt.Printf("%s\n", clusterManager.CertificateFingerprint)
case "update-interval-seconds":
case "update_interval_seconds":
value, ok := clusterManager.Config[database.UpdateIntervalSecondsKey]
if ok {
fmt.Printf("%s\n", value)
}

case "status-last-success-time":
case "status_last_success_time":
fmt.Printf("%s\n", clusterManager.StatusLastSuccessTime)
case "status-last-error-time":
case "status_last_error_time":
fmt.Printf("%s\n", clusterManager.StatusLastErrorTime)
case "status-last-error-response":
case "status_last_error_response":
fmt.Printf("%s\n", clusterManager.StatusLastErrorResponse)
case "reverse_tunnel":
value, ok := clusterManager.Config[database.ReverseTunnelKey]
if ok {
fmt.Printf("%s\n", value)
}

default:
return errors.New("Invalid key")
}
Expand All @@ -265,8 +272,9 @@ func (c *cmdClusterManagerSet) command() *cobra.Command {
cmd.Use = "set"
cmd.Short = "Set specific cluster manager configuration key."
cmd.Example = cli.FormatSection("", `microcloud cluster-manager set addresses example.com:8443
microcloud cluster-manager set certificate-fingerprint abababababababababababababababababababababababababababababababab
microcloud cluster-manager set update-interval-seconds 50`)
microcloud cluster-manager set certificate_fingerprint abababababababababababababababababababababababababababababababab
microcloud cluster-manager set update_interval_seconds 50
microcloud cluster-manager set reverse_tunnel true`)

cmd.RunE = c.run

Expand All @@ -291,10 +299,18 @@ func (c *cmdClusterManagerSet) run(_ *cobra.Command, args []string) error {
switch key {
case "addresses":
payload.Addresses = []string{value}
case "certificate-fingerprint":
case "certificate_fingerprint":
payload.CertificateFingerprint = &value
case "update-interval-seconds":
payload.UpdateInterval = &value
case "update_interval_seconds":
payload.UpdateIntervalSeconds = &value
case "reverse_tunnel":
err := validate.IsBool(value)
if err != nil {
return errors.New("Invalid value for reverse_tunnel, expected 'true' or 'false'")
}

enabled := value == "true" || value == "yes" || value == "on" || value == "1"
payload.ReverseTunnel = &enabled
default:
return errors.New("Invalid key")
}
Expand All @@ -317,7 +333,7 @@ func (c *cmdClusterManagerUnset) command() *cobra.Command {
cmd := &cobra.Command{}
cmd.Use = "unset"
cmd.Short = "Unset specific cluster manager configuration key."
cmd.Example = cli.FormatSection("", `microcloud cluster-manager unset update-interval-seconds`)
cmd.Example = cli.FormatSection("", `microcloud cluster-manager unset update_interval_seconds`)

cmd.RunE = c.run

Expand All @@ -339,9 +355,13 @@ func (c *cmdClusterManagerUnset) run(_ *cobra.Command, args []string) error {
payload := types.ClusterManagerPut{}

switch key {
case "update-interval-seconds":
emptyString := ""
payload.UpdateInterval = &emptyString
case "update_interval_seconds":
payload.UpdateIntervalSeconds = new("")

case "reverse_tunnel":
disabled := false
payload.ReverseTunnel = &disabled

default:
return errors.New("Invalid key")
}
Expand Down
37 changes: 18 additions & 19 deletions cmd/microcloudd/cluster_manager_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
Comment thread
roosterfish marked this conversation as resolved.
"fmt"
"net/http"
"net/url"
"slices"
"time"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/canonical/lxd/shared/api"
"github.com/canonical/lxd/shared/logger"
microTypes "github.com/canonical/microcluster/v3/microcluster/types"
"golang.org/x/sync/errgroup"

"github.com/canonical/microcloud/microcloud/api/types"
"github.com/canonical/microcloud/microcloud/client"
Expand All @@ -19,8 +21,8 @@ import (
)

// SendClusterManagerStatusMessageTask starts a go routine, that sends periodic status messages to cluster manager.
func SendClusterManagerStatusMessageTask(ctx context.Context, sh *service.Handler, s microTypes.State) {
go func(ctx context.Context, sh *service.Handler, s microTypes.State) {
func SendClusterManagerStatusMessageTask(ctx context.Context, g *errgroup.Group, sh *service.Handler, s microTypes.State) {
g.Go(func() error {
ticker := time.NewTicker(database.UpdateIntervalDefaultSeconds * time.Second)
defer ticker.Stop()

Expand All @@ -33,15 +35,15 @@ func SendClusterManagerStatusMessageTask(ctx context.Context, sh *service.Handle
}

case <-ctx.Done():
return // exit the loop and close the go routine
return nil // exit the loop and close the go routine
}
}
}(ctx, sh, s)
})
}

func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s microTypes.State) time.Duration {
logger.Debug("Starting sendClusterManagerStatusMessage")
var nextUpdate time.Duration = 0
var nextUpdate = time.Duration(database.UpdateIntervalDefaultSeconds) * time.Second

cloud := sh.Services[types.MicroCloud].(*service.CloudService)
isInitialized, err := cloud.IsInitialized(ctx)
Expand All @@ -55,9 +57,9 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s
return nextUpdate
}

clusterManager, clusterManagerConfig, err := database.LoadClusterManager(s, ctx, database.ClusterManagerDefaultName)
clusterManager, err := database.LoadClusterManager(s, ctx, database.ClusterManagerDefaultName)
if err != nil {
if err.Error() == "Cluster manager not found" {
if api.StatusErrorCheck(err, http.StatusNotFound) {
logger.Debug("Cluster manager not configured, skipping status message")
return nextUpdate
}
Expand All @@ -66,17 +68,14 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s
return nextUpdate
}

for _, config := range clusterManagerConfig {
if config.Key == database.UpdateIntervalSecondsKey {
interval, err := time.ParseDuration(config.Value + "s")
if err != nil {
logger.Error("Failed to parse update interval", logger.Ctx{"err": err})
return nextUpdate
}

nextUpdate = interval
break
}
nextUpdateFromDb, err := database.LoadClusterManagerUpdateIntervalSeconds(s, ctx, clusterManager.ID)
if err != nil && api.StatusErrorCheck(err, http.StatusNotFound) {
nextUpdate = time.Duration(database.UpdateIntervalDefaultSeconds) * time.Second
} else if err != nil {
logger.Error("Failed to fetch cluster manager update interval", logger.Ctx{"err": err})
return nextUpdate
} else {
nextUpdate = *nextUpdateFromDb
}

leaderClient, err := s.Database().Leader(ctx)
Expand All @@ -99,7 +98,7 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s
payload := types.ClusterManagerPostStatus{}

lxdService := sh.Services[types.LXD].(*service.LXDService)
lxdClient, err := lxdService.Client(context.Background())
lxdClient, err := lxdService.Client(ctx)
if err != nil {
logger.Error("Failed to get LXD client", logger.Ctx{"err": err})
return nextUpdate
Expand Down
Loading
Loading