Skip to content

Commit 8c3a9f0

Browse files
authored
Add migration command (#310)
1 parent 13771fa commit 8c3a9f0

63 files changed

Lines changed: 13361 additions & 13 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ generate-v1beta1-serverless-client: install-openapi-generator ## Generate server
5656
@echo "==> Generating serverless cdc client"
5757
rm -rf pkg/tidbcloud/v1beta1/serverless/cdc
5858
cd tools/openapi-generator && npx openapi-generator-cli generate --inline-schema-options RESOLVE_INLINE_ENUMS=true --additional-properties=withGoMod=false,enumClassPrefix=true,disallowAdditionalPropertiesIfNotPresent=false --global-property=apiTests=false,apiDocs=false,modelDocs=false,modelTests=false -i ../../pkg/tidbcloud/v1beta1/serverless/cdc.swagger.json -g go -o ../../pkg/tidbcloud/v1beta1/serverless/cdc --package-name cdc -c go/config.yaml
59+
@echo "==> Generating serverless cdc client"
60+
rm -rf pkg/tidbcloud/v1beta1/serverless/migration
61+
cd tools/openapi-generator && npx openapi-generator-cli generate --inline-schema-options RESOLVE_INLINE_ENUMS=true --additional-properties=withGoMod=false,enumClassPrefix=true,disallowAdditionalPropertiesIfNotPresent=false --global-property=apiTests=false,apiDocs=false,modelDocs=false,modelTests=false -i ../../pkg/tidbcloud/v1beta1/serverless/dm.swagger.json -g go -o ../../pkg/tidbcloud/v1beta1/serverless/migration --package-name migration -c go/config.yaml
5962
cd pkg && go fmt ./tidbcloud/v1beta1/serverless/... && goimports -w .
6063
@echo "==> Generating serverless privatelink client"
6164
rm -rf pkg/tidbcloud/v1beta1/serverless/privatelink

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.24.0
44

55
require (
66
github.com/AlecAivazis/survey/v2 v2.3.6
7+
github.com/AlekSi/pointer v1.2.0
78
github.com/aws/aws-sdk-go-v2 v1.27.1
89
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.23
910
github.com/charmbracelet/bubbles v0.17.1
@@ -31,6 +32,7 @@ require (
3132
github.com/spf13/pflag v1.0.5
3233
github.com/spf13/viper v1.18.2
3334
github.com/stretchr/testify v1.9.0
35+
github.com/tailscale/hujson v0.0.0-20250605163823-992244df8c5a
3436
github.com/tidbcloud/tidbcloud-cli/pkg v0.0.1
3537
github.com/xo/usql v0.19.2
3638
github.com/zalando/go-keyring v0.2.3

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ github.com/99designs/keyring v1.2.2 h1:pZd3neh/EmUzWONb35LxQfvuY7kiSXAq3HQd97+XB
2727
github.com/99designs/keyring v1.2.2/go.mod h1:wes/FrByc8j7lFOAGLGSNEg8f/PaI3cgTBqhFkHUrPk=
2828
github.com/AlecAivazis/survey/v2 v2.3.6 h1:NvTuVHISgTHEHeBFqt6BHOe4Ny/NwGZr7w+F8S9ziyw=
2929
github.com/AlecAivazis/survey/v2 v2.3.6/go.mod h1:4AuI9b7RjAR+G7v9+C4YSlX/YL3K3cWNXgWXOhllqvI=
30+
github.com/AlekSi/pointer v1.2.0 h1:glcy/gc4h8HnG2Z3ZECSzZ1IX1x2JxRVuDzaJwQE0+w=
31+
github.com/AlekSi/pointer v1.2.0/go.mod h1:gZGfd3dpW4vEc/UlyfKKi1roIqcCgwOIvb0tSNSBle0=
3032
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM=
3133
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo=
3234
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2 h1:FDif4R1+UUR+00q6wquyX90K7A8dN+R5E8GEadoP7sU=
@@ -641,6 +643,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
641643
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
642644
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
643645
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
646+
github.com/tailscale/hujson v0.0.0-20250605163823-992244df8c5a h1:a6TNDN9CgG+cYjaeN8l2mc4kSz2iMiCDQxPEyltUV/I=
647+
github.com/tailscale/hujson v0.0.0-20250605163823-992244df8c5a/go.mod h1:EbW0wDK/qEUYI0A5bqq0C2kF8JTQwWONmGDBbzsxxHo=
644648
github.com/thda/tds v0.1.7 h1:s29kbnJK0agL3ps85A/sb9XS2uxgKF5UJ6AZjbyqXX4=
645649
github.com/thda/tds v0.1.7/go.mod h1:isLIF1oZdXfkqVMJM8RyNrsjlHPlTKnPlnsBs7ngZcM=
646650
github.com/trinodb/trino-go-client v0.315.0 h1:9mU+42VGw9Hnp9R1hkhWlIrQp9o+V01Gx1KlHjTkM1c=

internal/cli/serverless/changefeed/list.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package changefeed
1616

1717
import (
1818
"fmt"
19+
"time"
1920

2021
"github.com/juju/errors"
2122
"github.com/spf13/cobra"
@@ -138,7 +139,7 @@ func ListCmd(h *internal.Helper) *cobra.Command {
138139
*item.DisplayName,
139140
string(item.Sink.Type),
140141
string(*item.State),
141-
item.CreateTime.String(),
142+
item.CreateTime.Format(time.RFC3339),
142143
})
143144
}
144145
err := output.PrintHumanTable(h.IOStreams.Out, columns, rows)

internal/cli/serverless/cluster.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/changefeed"
2323
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/dataimport"
2424
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/export"
25+
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/migration"
2526
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/privatelink"
2627
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/sqluser"
2728

@@ -54,6 +55,7 @@ func Cmd(h *internal.Helper) *cobra.Command {
5455
serverlessCmd.AddCommand(authorizednetwork.AuthorizedNetworkCmd(h))
5556
serverlessCmd.AddCommand(changefeed.ChangefeedCmd(h))
5657
serverlessCmd.AddCommand(privatelink.PrivateLinkConnectionCmd(h))
58+
serverlessCmd.AddCommand(migration.MigrationCmd(h))
5759

5860
return serverlessCmd
5961
}
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package migration
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"os"
22+
"slices"
23+
"strings"
24+
"time"
25+
26+
"github.com/AlekSi/pointer"
27+
aws "github.com/aws/aws-sdk-go-v2/aws"
28+
"github.com/fatih/color"
29+
"github.com/juju/errors"
30+
"github.com/spf13/cobra"
31+
"github.com/tailscale/hujson"
32+
33+
"github.com/tidbcloud/tidbcloud-cli/internal"
34+
"github.com/tidbcloud/tidbcloud-cli/internal/config"
35+
"github.com/tidbcloud/tidbcloud-cli/internal/flag"
36+
"github.com/tidbcloud/tidbcloud-cli/internal/output"
37+
"github.com/tidbcloud/tidbcloud-cli/internal/service/cloud"
38+
pkgmigration "github.com/tidbcloud/tidbcloud-cli/pkg/tidbcloud/v1beta1/serverless/migration"
39+
)
40+
41+
func CreateCmd(h *internal.Helper) *cobra.Command {
42+
var cmd = &cobra.Command{
43+
Use: "create",
44+
Short: "Create a migration",
45+
Args: cobra.NoArgs,
46+
Example: fmt.Sprintf(` Create a migration:
47+
$ %[1]s serverless migration create -c <cluster-id> --display-name <name> --config-file <file-path> --dry-run
48+
$ %[1]s serverless migration create -c <cluster-id> --display-name <name> --config-file <file-path>
49+
`, config.CliName),
50+
PreRunE: func(cmd *cobra.Command, args []string) error {
51+
return markCreateMigrationRequiredFlags(cmd)
52+
},
53+
RunE: func(cmd *cobra.Command, args []string) error {
54+
d, err := h.Client()
55+
if err != nil {
56+
return err
57+
}
58+
ctx := cmd.Context()
59+
60+
dryRun, err := cmd.Flags().GetBool(flag.DryRun)
61+
if err != nil {
62+
return errors.Trace(err)
63+
}
64+
clusterID, err := cmd.Flags().GetString(flag.ClusterID)
65+
if err != nil {
66+
return errors.Trace(err)
67+
}
68+
name, err := cmd.Flags().GetString(flag.DisplayName)
69+
if err != nil {
70+
return errors.Trace(err)
71+
}
72+
if strings.TrimSpace(name) == "" {
73+
return errors.New("display name is required")
74+
}
75+
configPath, err := cmd.Flags().GetString(flag.MigrationConfigFile)
76+
if err != nil {
77+
return errors.Trace(err)
78+
}
79+
configPath = strings.TrimSpace(configPath)
80+
if configPath == "" {
81+
return errors.New("config file path is required")
82+
}
83+
definitionBytes, err := os.ReadFile(configPath)
84+
if err != nil {
85+
return errors.Annotatef(err, "failed to read config file %q", configPath)
86+
}
87+
definitionStr := string(definitionBytes)
88+
89+
sources, target, mode, err := parseMigrationDefinition(definitionStr)
90+
if err != nil {
91+
return err
92+
}
93+
94+
if dryRun {
95+
precheckBody := &pkgmigration.MigrationServicePrecheckBody{
96+
DisplayName: name,
97+
Sources: sources,
98+
Target: target,
99+
Mode: mode,
100+
}
101+
return runMigrationPrecheck(ctx, d, clusterID, precheckBody, h)
102+
}
103+
104+
createBody := &pkgmigration.MigrationServiceCreateMigrationBody{
105+
DisplayName: name,
106+
Sources: sources,
107+
Target: target,
108+
Mode: mode,
109+
}
110+
111+
resp, err := d.CreateMigration(ctx, clusterID, createBody)
112+
if err != nil {
113+
return errors.Trace(err)
114+
}
115+
116+
migrationID := aws.ToString(resp.MigrationId)
117+
fmt.Fprintln(h.IOStreams.Out, color.GreenString("migration %s(%s) created", name, migrationID))
118+
return nil
119+
},
120+
}
121+
122+
cmd.Flags().StringP(flag.ClusterID, flag.ClusterIDShort, "", "The ID of the target cluster.")
123+
cmd.Flags().StringP(flag.DisplayName, flag.DisplayNameShort, "", "Display name for the migration.")
124+
cmd.Flags().String(flag.MigrationConfigFile, "", "Path to a migration config JSON file. Use \"ticloud serverless migration template --mode <mode>\" to print templates.")
125+
cmd.Flags().Bool(flag.DryRun, false, "Run a migration precheck (dry run) with the provided inputs without creating a migration.")
126+
127+
return cmd
128+
}
129+
130+
func markCreateMigrationRequiredFlags(cmd *cobra.Command) error {
131+
for _, fn := range []string{flag.ClusterID, flag.DisplayName, flag.MigrationConfigFile} {
132+
if err := cmd.MarkFlagRequired(fn); err != nil {
133+
return err
134+
}
135+
}
136+
return nil
137+
}
138+
139+
const (
140+
precheckPollInterval = 5 * time.Second
141+
precheckPollTimeout = 2 * time.Minute
142+
)
143+
144+
func runMigrationPrecheck(ctx context.Context, client cloud.TiDBCloudClient, clusterID string, body *pkgmigration.MigrationServicePrecheckBody, h *internal.Helper) error {
145+
resp, err := client.CreateMigrationPrecheck(ctx, clusterID, body)
146+
if err != nil {
147+
return errors.Trace(err)
148+
}
149+
if resp.PrecheckId == nil || *resp.PrecheckId == "" {
150+
return errors.New("precheck created but ID is empty")
151+
}
152+
precheckID := *resp.PrecheckId
153+
fmt.Fprintf(h.IOStreams.Out, "migration precheck %s created, polling results...\n", precheckID)
154+
155+
ticker := time.NewTicker(precheckPollInterval)
156+
defer ticker.Stop()
157+
pollCtx, cancel := context.WithTimeout(ctx, precheckPollTimeout)
158+
defer cancel()
159+
160+
// Poll precheck status until it finishes or the overall timeout is hit.
161+
for {
162+
select {
163+
case <-pollCtx.Done():
164+
if pollCtx.Err() == context.DeadlineExceeded {
165+
return errors.Errorf("migration precheck polling timed out after %s", precheckPollTimeout)
166+
}
167+
return pollCtx.Err()
168+
case <-ticker.C:
169+
result, err := client.GetMigrationPrecheck(pollCtx, clusterID, precheckID)
170+
if err != nil {
171+
return errors.Trace(err)
172+
}
173+
finished, err := printPrecheckSummary(result, h)
174+
if err != nil {
175+
return err
176+
}
177+
if !finished {
178+
continue
179+
}
180+
if result.GetStatus() == pkgmigration.MIGRATIONPRECHECKSTATUS_FAILED {
181+
fmt.Fprintln(h.IOStreams.Out, color.RedString("migration precheck %s failed", precheckID))
182+
return errors.New("migration precheck failed")
183+
}
184+
fmt.Fprintln(h.IOStreams.Out, color.GreenString("migration precheck %s passed", precheckID))
185+
return nil
186+
}
187+
}
188+
}
189+
190+
func isPrecheckUnfinished(status pkgmigration.MigrationPrecheckStatus) bool {
191+
switch status {
192+
case pkgmigration.MIGRATIONPRECHECKSTATUS_PENDING,
193+
pkgmigration.MIGRATIONPRECHECKSTATUS_RUNNING:
194+
return true
195+
default:
196+
return false
197+
}
198+
}
199+
200+
func printPrecheckSummary(result *pkgmigration.MigrationPrecheck, h *internal.Helper) (bool, error) {
201+
if isPrecheckUnfinished(result.GetStatus()) {
202+
fmt.Fprintf(h.IOStreams.Out, "precheck %s summary (status %s)\n", result.GetPrecheckId(), result.GetStatus())
203+
fmt.Fprintf(h.IOStreams.Out, "Total: %d, Success: %d, Warn: %d, Failed: %d\n",
204+
aws.ToInt32(result.Total), aws.ToInt32(result.SuccessCnt), aws.ToInt32(result.WarnCnt), aws.ToInt32(result.FailedCnt))
205+
return false, nil
206+
}
207+
208+
fmt.Fprintf(h.IOStreams.Out, "precheck %s finished with status %s\n", result.GetPrecheckId(), result.GetStatus())
209+
fmt.Fprintf(h.IOStreams.Out, "Total: %d, Success: %d, Warn: %d, Failed: %d\n",
210+
aws.ToInt32(result.Total), aws.ToInt32(result.SuccessCnt), aws.ToInt32(result.WarnCnt), aws.ToInt32(result.FailedCnt))
211+
if len(result.Items) == 0 {
212+
return true, nil
213+
}
214+
columns := []output.Column{"Type", "Status", "Description", "Reason", "Solution"}
215+
rows := make([]output.Row, 0, len(result.Items))
216+
for _, item := range result.Items {
217+
if !shouldPrintPrecheckItem(item.Status) {
218+
continue
219+
}
220+
rows = append(rows, output.Row{
221+
string(pointer.Get(item.Type)),
222+
string(pointer.Get(item.Status)),
223+
pointer.Get(item.Description),
224+
pointer.Get(item.Reason),
225+
pointer.Get(item.Solution),
226+
})
227+
}
228+
if len(rows) == 0 {
229+
return true, nil
230+
}
231+
return true, output.PrintHumanTable(h.IOStreams.Out, columns, rows)
232+
}
233+
234+
// shouldPrintPrecheckItem reports whether a precheck item should be shown to users.
235+
// Currently only WARNING and FAILED statuses surface because SUCCESS does not
236+
// provide actionable information.
237+
func shouldPrintPrecheckItem(status *pkgmigration.PrecheckItemStatus) bool {
238+
if status == nil {
239+
return false
240+
}
241+
switch *status {
242+
case pkgmigration.PRECHECKITEMSTATUS_WARNING,
243+
pkgmigration.PRECHECKITEMSTATUS_FAILED:
244+
return true
245+
default:
246+
return false
247+
}
248+
}
249+
250+
func parseMigrationDefinition(value string) ([]pkgmigration.Source, pkgmigration.Target, pkgmigration.TaskMode, error) {
251+
trimmed := strings.TrimSpace(value)
252+
if trimmed == "" {
253+
return nil, pkgmigration.Target{}, "", errors.New("migration config is required; use --config-file")
254+
}
255+
var payload struct {
256+
Sources []pkgmigration.Source `json:"sources"`
257+
Target *pkgmigration.Target `json:"target"`
258+
Mode string `json:"mode"`
259+
}
260+
stdJson, err := standardizeJSON([]byte(trimmed))
261+
if err != nil {
262+
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
263+
}
264+
if err := json.Unmarshal(stdJson, &payload); err != nil {
265+
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
266+
}
267+
if len(payload.Sources) == 0 {
268+
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include at least one source")
269+
}
270+
if payload.Target == nil {
271+
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include the target block")
272+
}
273+
mode, err := parseMigrationMode(payload.Mode)
274+
if err != nil {
275+
return nil, pkgmigration.Target{}, "", err
276+
}
277+
return payload.Sources, *payload.Target, mode, nil
278+
}
279+
280+
func parseMigrationMode(value string) (pkgmigration.TaskMode, error) {
281+
trimmed := strings.TrimSpace(value)
282+
if trimmed == "" {
283+
return "", errors.New("empty config file")
284+
}
285+
normalized := strings.ToUpper(trimmed)
286+
mode := pkgmigration.TaskMode(normalized)
287+
if slices.Contains(pkgmigration.AllowedTaskModeEnumValues, mode) {
288+
return mode, nil
289+
}
290+
return "", errors.Errorf("invalid mode %q, allowed values: %s", value, pkgmigration.AllowedTaskModeEnumValues)
291+
}
292+
293+
// standardizeJSON accepts JSON With Commas and Comments(JWCC) see
294+
// https://nigeltao.github.io/blog/2021/json-with-commas-comments.html) and
295+
// returns a standard JSON byte slice ready for json.Unmarshal.
296+
func standardizeJSON(b []byte) ([]byte, error) {
297+
ast, err := hujson.Parse(b)
298+
if err != nil {
299+
return b, err
300+
}
301+
ast.Standardize()
302+
return ast.Pack(), nil
303+
}

0 commit comments

Comments
 (0)