Skip to content

Commit c5374cf

Browse files
committed
OCPEDGE-2038: Add etcd client helper function
1 parent 9569c75 commit c5374cf

1 file changed

Lines changed: 84 additions & 0 deletions

File tree

pkg/controllers/etcd.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@ import (
1919
"context"
2020
"errors"
2121
"fmt"
22+
"net"
2223
"os"
2324
"os/exec"
2425
"path/filepath"
2526
"time"
2627

2728
"github.com/openshift/microshift/pkg/config"
2829
"github.com/openshift/microshift/pkg/util/cryptomaterial"
30+
corev1 "k8s.io/api/core/v1"
31+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/client-go/kubernetes"
33+
"k8s.io/client-go/tools/clientcmd"
2934
klog "k8s.io/klog/v2"
3035

3136
"go.etcd.io/etcd/client/pkg/v3/transport"
@@ -251,3 +256,82 @@ func getEtcdClient(ctx context.Context) (*clientv3.Client, error) {
251256
}
252257
return cli, nil
253258
}
259+
260+
// GetClusterEtcdClient creates a new etcd client for the cluster.
261+
// It uses the kubeconfig to list the nodes in the cluster to test which ones are learners
262+
// and then creates a new client with voting members only.
263+
func GetClusterEtcdClient(ctx context.Context, kubeConfigPath string) (*clientv3.Client, error) {
264+
certsDir := cryptomaterial.CertsDirectory(config.DataDir)
265+
etcdAPIServerClientCertDir := cryptomaterial.EtcdAPIServerClientCertDir(certsDir)
266+
267+
tlsInfo := transport.TLSInfo{
268+
CertFile: cryptomaterial.ClientCertPath(etcdAPIServerClientCertDir),
269+
KeyFile: cryptomaterial.ClientKeyPath(etcdAPIServerClientCertDir),
270+
TrustedCAFile: cryptomaterial.CACertPath(cryptomaterial.EtcdSignerDir(certsDir)),
271+
}
272+
tlsConfig, err := tlsInfo.ClientConfig()
273+
if err != nil {
274+
return nil, err
275+
}
276+
277+
client, err := clientv3.New(clientv3.Config{
278+
Endpoints: []string{"https://localhost:2379"},
279+
DialTimeout: 5 * time.Second,
280+
TLS: tlsConfig,
281+
Context: ctx,
282+
})
283+
if err != nil {
284+
return nil, err
285+
}
286+
defer func() { _ = client.Close() }()
287+
288+
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
289+
if err != nil {
290+
return nil, fmt.Errorf("failed to load kubeconfig from %s: %v", kubeConfigPath, err)
291+
}
292+
adminClient, err := kubernetes.NewForConfig(restConfig)
293+
if err != nil {
294+
return nil, fmt.Errorf("failed to create admin kubernetes client: %w", err)
295+
}
296+
297+
nodes, err := adminClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
298+
if err != nil {
299+
return nil, fmt.Errorf("failed to list nodes: %w", err)
300+
}
301+
302+
var memberEndpoints []string
303+
for _, node := range nodes.Items {
304+
var nodeIP string
305+
for _, addr := range node.Status.Addresses {
306+
if addr.Type == corev1.NodeInternalIP {
307+
nodeIP = addr.Address
308+
break
309+
}
310+
}
311+
if nodeIP == "" {
312+
continue
313+
}
314+
endpoint := net.JoinHostPort(nodeIP, "2379")
315+
status, err := client.Status(ctx, endpoint)
316+
if err != nil {
317+
continue
318+
}
319+
if status != nil && !status.IsLearner {
320+
memberEndpoints = append(memberEndpoints, fmt.Sprintf("https://%s", endpoint))
321+
}
322+
}
323+
if len(memberEndpoints) == 0 {
324+
memberEndpoints = []string{"https://localhost:2379"}
325+
}
326+
327+
clusterClient, err := clientv3.New(clientv3.Config{
328+
Endpoints: memberEndpoints,
329+
DialTimeout: 5 * time.Second,
330+
TLS: tlsConfig,
331+
Context: ctx,
332+
})
333+
if err != nil {
334+
return nil, err
335+
}
336+
return clusterClient, nil
337+
}

0 commit comments

Comments
 (0)