diff --git a/copy/boundary_normalize.go b/copy/boundary_normalize.go new file mode 100644 index 0000000..790b524 --- /dev/null +++ b/copy/boundary_normalize.go @@ -0,0 +1,168 @@ +package copy + +import ( + "fmt" + "regexp" + "sort" + "strings" +) + +// normalizeBoundary converts a partition boundary string from either the GP6 +// pg_get_partition_rule_def format or the GP7/CBDB pg_get_expr format into a +// canonical string that compares equal across database versions when the +// underlying partition bounds are semantically identical. +// +// GP6 "START ('2024-01-01'::date) END ('2024-02-01'::date) EVERY ('1 mon'::interval) WITH (...)" +// GP6 "PARTITION jan START ('2024-01-01'::date) END ('2024-02-01'::date) WITH (...)" +// GP7 "FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')" +// all → "RANGE:[2024-01-01,2024-02-01)" +// +// Returns "" for empty input. Unrecognised input is returned unchanged so +// that it can never accidentally match a known-format boundary. +func normalizeBoundary(s string) string { + s = strings.TrimSpace(s) + if s == "" { + return "" + } + if s == "DEFAULT" { + return "DEFAULT" + } + upper := strings.ToUpper(s) + if strings.HasPrefix(upper, "FOR VALUES") { + return normalizeGP7Boundary(s) + } + // GP6 pg_get_partition_rule_def output comes in several forms: + // "PARTITION START ..." (named partitions) + // "START ... END ... EVERY ..." (EVERY-generated, no PARTITION prefix) + // "DEFAULT PARTITION ..." (default partitions) + // "SUBPARTITION VALUES ..." (sub-partitions) + // "DEFAULT SUBPARTITION ..." (default sub-partitions) + // "PARTITION VALUES ..." (list partitions) + // Dispatch broadly — normalizeGP6RuleDef returns s unchanged if no regex matches. + if strings.HasPrefix(upper, "PARTITION ") || strings.HasPrefix(upper, "DEFAULT") || + strings.HasPrefix(upper, "START") || strings.HasPrefix(upper, "SUBPARTITION ") || + strings.Contains(upper, " START (") || strings.Contains(upper, "VALUES") { + return normalizeGP6RuleDef(s) + } + return s +} + +var ( + // GP7/CBDB: pg_get_expr output. + // NOTE: currently handles single-column partition bounds only. + // Multi-column bounds (e.g. FROM (1, 'a') TO (2, 'b')) will not + // normalise and will be returned as-is (fails safe — no silent mismatch). + reGP7Range = regexp.MustCompile(`(?i)^FOR\s+VALUES\s+FROM\s+\((.+?)\)\s+TO\s+\((.+?)\)$`) + reGP7List = regexp.MustCompile(`(?i)^FOR\s+VALUES\s+IN\s+\((.+)\)$`) + reGP7Hash = regexp.MustCompile(`(?i)^FOR\s+VALUES\s+WITH\s+\(modulus\s+(\d+),\s*remainder\s+(\d+)\)$`) + + // GP6: pg_get_partition_rule_def output. + // Applied after preprocessing strips EVERY (...), WITH (...), and type casts. + // After preprocessing, bounds are clean: START ('val') or START (val) + reGP6RuleRange = regexp.MustCompile(`(?i)START\s*\(\s*'?(\(?[^')]+?\)?)'?\s*\)\s*(INCLUSIVE|EXCLUSIVE)?\s*END\s*\(\s*'?(\(?[^')]+?\)?)'?\s*\)\s*(INCLUSIVE|EXCLUSIVE)?`) + reGP6RuleList = regexp.MustCompile(`(?i)\bVALUES\s*\(([^)]+)\)`) + + // Preprocess helpers for GP6 boundary strings. + reStripEvery = regexp.MustCompile(`(?i)\s+EVERY\s*\([^)]*\)`) + reStripWith = regexp.MustCompile(`(?i)\s+WITH\s*\(.*$`) + // Strip type casts: ::date, ::bigint, ::timestamp without time zone, ::character(3), etc. + reStripCast = regexp.MustCompile(`::[\w][\w ]*(?:\(\d+\))?`) +) + +// normalizeGP6RuleDef parses the output of pg_get_partition_rule_def(oid, true) +// used as the boundary field for GP6 (Greenplum 6 / HashData 3.x) partitions. +// +// Recognised formats (after preprocessing strips EVERY/WITH suffixes): +// +// RANGE: "[PARTITION ] START ([::type]) [INCLUSIVE|EXCLUSIVE] END ([::type]) [INCLUSIVE|EXCLUSIVE]" +// LIST: "[PARTITION|SUBPARTITION ] VALUES(, )" +// DEFAULT: "DEFAULT [SUB]PARTITION ..." +func normalizeGP6RuleDef(s string) string { + upper := strings.ToUpper(s) + + // DEFAULT / DEFAULT SUBPARTITION + if strings.HasPrefix(upper, "DEFAULT ") { + return "DEFAULT" + } + + // Preprocess: strip trailing EVERY (...), WITH (...), and type casts to simplify regex. + cleaned := reStripEvery.ReplaceAllString(s, "") + cleaned = reStripWith.ReplaceAllString(cleaned, "") + cleaned = reStripCast.ReplaceAllString(cleaned, "") + + // RANGE: START ... END ... + if m := reGP6RuleRange.FindStringSubmatch(cleaned); len(m) >= 4 { + start := stripBoundaryValue(m[1]) + end := stripBoundaryValue(m[3]) + startMod := strings.ToUpper(strings.TrimSpace(m[2])) + endMod := strings.ToUpper(strings.TrimSpace(m[4])) + // GP6 default: START inclusive, END exclusive (matches GP7 FROM..TO semantics) + lb, rb := "[", ")" + if startMod == "EXCLUSIVE" { + lb = "(" + } + if endMod == "INCLUSIVE" { + rb = "]" + } + return fmt.Sprintf("RANGE:%s%s,%s%s", lb, start, end, rb) + } + + // LIST: VALUES(...) + if m := reGP6RuleList.FindStringSubmatch(cleaned); len(m) == 2 { + vals := splitListValues(m[1]) + sort.Strings(vals) + return fmt.Sprintf("LIST:(%s)", strings.Join(vals, ",")) + } + + return s +} + +func normalizeGP7Boundary(s string) string { + if m := reGP7Range.FindStringSubmatch(s); len(m) == 3 { + start := stripSingleQuotes(strings.TrimSpace(m[1])) + end := stripSingleQuotes(strings.TrimSpace(m[2])) + // GP7 FROM..TO is always inclusive-start, exclusive-end + return fmt.Sprintf("RANGE:[%s,%s)", start, end) + } + if m := reGP7List.FindStringSubmatch(s); len(m) == 2 { + vals := splitListValues(m[1]) + sort.Strings(vals) + return fmt.Sprintf("LIST:(%s)", strings.Join(vals, ",")) + } + if m := reGP7Hash.FindStringSubmatch(s); len(m) == 3 { + return fmt.Sprintf("HASH:%s:%s", m[1], m[2]) + } + return s +} + +// stripBoundaryValue cleans a captured boundary value: +// - strips outer parentheses for negative numbers: (-100) → -100 +// - strips single quotes: '2024-01-01' → 2024-01-01 +func stripBoundaryValue(s string) string { + s = strings.TrimSpace(s) + // Strip outer parens (GP6 wraps negative numbers: ((-100)) → captured as (-100)) + for len(s) >= 2 && s[0] == '(' && s[len(s)-1] == ')' { + s = s[1 : len(s)-1] + } + return stripSingleQuotes(s) +} + +// splitListValues splits "val1, val2, ..." and strips surrounding single quotes. +func splitListValues(s string) []string { + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + v := stripSingleQuotes(strings.TrimSpace(p)) + if v != "" { + out = append(out, v) + } + } + return out +} + +func stripSingleQuotes(s string) string { + if len(s) >= 2 && s[0] == '\'' && s[len(s)-1] == '\'' { + return s[1 : len(s)-1] + } + return s +} diff --git a/copy/boundary_normalize_test.go b/copy/boundary_normalize_test.go new file mode 100644 index 0000000..af681f1 --- /dev/null +++ b/copy/boundary_normalize_test.go @@ -0,0 +1,297 @@ +package copy + +import "testing" + +func TestNormalizeBoundary_Empty(t *testing.T) { + if got := normalizeBoundary(""); got != "" { + t.Errorf("expected empty, got %q", got) + } + if got := normalizeBoundary(" "); got != "" { + t.Errorf("expected empty for whitespace, got %q", got) + } +} + +// ---- GP7/CBDB: pg_get_expr ---- + +func TestNormalizeBoundary_GP7_Range_Date(t *testing.T) { + in := "FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')" + want := "RANGE:[2024-01-01,2024-02-01)" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP7_Range_Int(t *testing.T) { + in := "FOR VALUES FROM (1) TO (1000)" + want := "RANGE:[1,1000)" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP7_List(t *testing.T) { + in := "FOR VALUES IN ('apple', 'banana')" + want := "LIST:(apple,banana)" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP7_List_Sorted(t *testing.T) { + in := "FOR VALUES IN ('zebra', 'apple')" + want := "LIST:(apple,zebra)" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP7_Hash(t *testing.T) { + in := "FOR VALUES WITH (modulus 4, remainder 0)" + want := "HASH:4:0" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP7_Default(t *testing.T) { + if got := normalizeBoundary("DEFAULT"); got != "DEFAULT" { + t.Errorf("got %q, want DEFAULT", got) + } +} + +// ---- GP6: pg_get_partition_rule_def (named partitions) ---- + +func TestNormalizeBoundary_GP6_Range_Date(t *testing.T) { + in := "PARTITION p1 START ('2024-01-01'::date) END ('2024-02-01'::date) WITH (appendonly='true')" + want := "RANGE:[2024-01-01,2024-02-01)" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP6_Range_Int(t *testing.T) { + in := "PARTITION p1 START ('1'::integer) END ('1000'::integer) WITH (appendonly='true')" + want := "RANGE:[1,1000)" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP6_Range_ExclusiveStart(t *testing.T) { + in := "PARTITION p1 START ('100') EXCLUSIVE END ('200') WITH (appendonly='true')" + want := "RANGE:(100,200)" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP6_Range_InclusiveEnd(t *testing.T) { + in := "PARTITION p1 START ('1') END ('10') INCLUSIVE WITH (appendonly='true')" + want := "RANGE:[1,10]" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP6_List(t *testing.T) { + in := "PARTITION pa VALUES('apple', 'banana') WITH (appendonly='true')" + want := "LIST:(apple,banana)" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP6_List_Sorted(t *testing.T) { + in := "PARTITION pa VALUES('zebra', 'apple') WITH (appendonly='true')" + want := "LIST:(apple,zebra)" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestNormalizeBoundary_GP6_Default(t *testing.T) { + in := "DEFAULT PARTITION pother WITH (appendonly='true')" + want := "DEFAULT" + if got := normalizeBoundary(in); got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +// ---- Real GP6 boundary strings (verbatim from pg_get_partition_rule_def on GP6 6.27.1 / HashData 3.13.33) ---- + +func TestNormalizeBoundary_RealGP6_AllTypes(t *testing.T) { + cases := []struct { + label string + input string + want string + }{ + // date + EVERY + {"date EVERY", "START ('2024-01-01'::date) END ('2024-02-01'::date) EVERY ('1 mon'::interval) WITH (appendonly='true', blocksize='1048576')", "RANGE:[2024-01-01,2024-02-01)"}, + // date + named + {"date named", "PARTITION jan START ('2024-01-01'::date) END ('2024-02-01'::date) WITH (appendonly='true', blocksize='1048576')", "RANGE:[2024-01-01,2024-02-01)"}, + // int4 + EVERY (unquoted) + {"int4 EVERY", "START (0) END (100) EVERY (100) WITH (appendonly='true', blocksize='1048576')", "RANGE:[0,100)"}, + // int4 + named (unquoted) + {"int4 named", "PARTITION p1 START (0) END (100) WITH (appendonly='true', blocksize='1048576')", "RANGE:[0,100)"}, + // int4 + INCLUSIVE + {"int4 INCLUSIVE", "PARTITION p2 START (100) END (200) INCLUSIVE WITH (appendonly='true', blocksize='1048576')", "RANGE:[100,200]"}, + // int4 negative (double parens) + {"int4 negative", "PARTITION pn START ((-100)) END (0) WITH (appendonly='true', blocksize='1048576')", "RANGE:[-100,0)"}, + // bigint + EVERY (unquoted with cast) + {"bigint EVERY", "START (0::bigint) END (100::bigint) EVERY (100::bigint) WITH (appendonly='true', blocksize='1048576')", "RANGE:[0,100)"}, + // bigint + named + {"bigint named", "PARTITION p1 START (0::bigint) END (100::bigint) WITH (appendonly='true', blocksize='1048576')", "RANGE:[0,100)"}, + // bigint negative + {"bigint negative", "PARTITION pn START ((-9999999999)::bigint) END (0::bigint) WITH (appendonly='true', blocksize='1048576')", "RANGE:[-9999999999,0)"}, + // smallint + EVERY + {"smallint EVERY", "START (0::smallint) END (100::smallint) EVERY (100::smallint) WITH (appendonly='true', blocksize='1048576')", "RANGE:[0,100)"}, + // numeric + EVERY (unquoted) + {"numeric EVERY", "START (0.0) END (10.0) EVERY (10.0) WITH (appendonly='true', blocksize='1048576')", "RANGE:[0.0,10.0)"}, + // numeric + named + {"numeric named", "PARTITION p1 START (0.0) END (10.0) WITH (appendonly='true', blocksize='1048576')", "RANGE:[0.0,10.0)"}, + // real (float4) + named (unquoted with cast) + {"real named", "PARTITION p1 START (0::real) END (10::real) WITH (appendonly='true', blocksize='1048576')", "RANGE:[0,10)"}, + // double precision + named (multi-word cast) + {"double named", "PARTITION p1 START (0::double precision) END (100::double precision) WITH (appendonly='true', blocksize='1048576')", "RANGE:[0,100)"}, + // text + named (quoted with cast) + {"text named", "PARTITION pa START ('A'::text) END ('M'::text) WITH (appendonly='true', blocksize='1048576')", "RANGE:[A,M)"}, + // char(3) + named (multi-char cast with parens) + {"char(3) named", "PARTITION pa START ('AAA'::character(3)) END ('MMM'::character(3)) WITH (appendonly='true', blocksize='1048576')", "RANGE:[AAA,MMM)"}, + // timestamp without time zone + EVERY (multi-word cast) + {"timestamp EVERY", "START ('2024-01-01 00:00:00'::timestamp without time zone) END ('2024-02-01 00:00:00'::timestamp without time zone) EVERY ('1 mon'::interval) WITH (appendonly='true', blocksize='1048576')", "RANGE:[2024-01-01 00:00:00,2024-02-01 00:00:00)"}, + // timestamp without time zone + named + {"timestamp named", "PARTITION jan START ('2024-01-01 00:00:00'::timestamp without time zone) END ('2024-02-01 00:00:00'::timestamp without time zone) WITH (appendonly='true', blocksize='1048576')", "RANGE:[2024-01-01 00:00:00,2024-02-01 00:00:00)"}, + // timestamp with time zone + named + {"timestamptz named", "PARTITION jan START ('2024-01-01 00:00:00+08'::timestamp with time zone) END ('2024-02-01 00:00:00+08'::timestamp with time zone) WITH (appendonly='true', blocksize='1048576')", "RANGE:[2024-01-01 00:00:00+08,2024-02-01 00:00:00+08)"}, + // LIST text + {"list text", "PARTITION pa VALUES('beijing', 'shanghai') WITH (appendonly='true', blocksize='1048576')", "LIST:(beijing,shanghai)"}, + // LIST text with spaces + {"list text spaces", "PARTITION pa VALUES('New York', 'Los Angeles') WITH (appendonly='true', blocksize='1048576')", "LIST:(Los Angeles,New York)"}, + // LIST int (unquoted) + {"list int", "PARTITION active VALUES(1, 2, 3) WITH (appendonly='true', blocksize='1048576')", "LIST:(1,2,3)"}, + // LIST char(1) + {"list char", "PARTITION py VALUES('Y') WITH (appendonly='true', blocksize='1048576')", "LIST:(Y)"}, + // DEFAULT + {"default", "DEFAULT PARTITION pother WITH (appendonly='true', blocksize='1048576')", "DEFAULT"}, + // DEFAULT SUBPARTITION + {"default subpartition", "DEFAULT SUBPARTITION other WITH (appendonly='true', blocksize='1048576')", "DEFAULT"}, + // SUBPARTITION LIST + {"subpartition list", "SUBPARTITION east VALUES('east') WITH (appendonly='true', blocksize='1048576')", "LIST:(east)"}, + // ALTER TABLE ADD PARTITION (same format as named) + {"alter add", "PARTITION p2 START (100) END (200) WITH (appendonly='true', blocksize='1048576')", "RANGE:[100,200)"}, + // SPLIT PARTITION (same format as named) + {"split", "PARTITION p2a START (100) END (200) WITH (appendonly='true', blocksize='1048576')", "RANGE:[100,200)"}, + } + for _, c := range cases { + if got := normalizeBoundary(c.input); got != c.want { + t.Errorf("[%s] got %q, want %q\n input: %s", c.label, got, c.want, c.input) + } + } +} + +// ---- Real GP7 boundary strings (verbatim from pg_get_expr on GP7 7.1.0) ---- + +func TestNormalizeBoundary_RealGP7_AllTypes(t *testing.T) { + cases := []struct { + label string + input string + want string + }{ + {"date", "FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')", "RANGE:[2024-01-01,2024-02-01)"}, + {"int4", "FOR VALUES FROM (0) TO (100)", "RANGE:[0,100)"}, + {"int4 negative", "FOR VALUES FROM ('-100') TO (0)", "RANGE:[-100,0)"}, + {"int4 neg large", "FOR VALUES FROM ('-9999') TO (0)", "RANGE:[-9999,0)"}, + {"bigint", "FOR VALUES FROM ('0') TO ('100')", "RANGE:[0,100)"}, + {"bigint negative", "FOR VALUES FROM ('-9999999999') TO ('0')", "RANGE:[-9999999999,0)"}, + {"smallint", "FOR VALUES FROM ('0') TO ('100')", "RANGE:[0,100)"}, + {"numeric", "FOR VALUES FROM (0.0) TO (10.0)", "RANGE:[0.0,10.0)"}, + {"real", "FOR VALUES FROM ('0') TO ('10')", "RANGE:[0,10)"}, + {"double", "FOR VALUES FROM ('0') TO ('100')", "RANGE:[0,100)"}, + {"text", "FOR VALUES FROM ('A') TO ('M')", "RANGE:[A,M)"}, + {"char(3)", "FOR VALUES FROM ('AAA') TO ('MMM')", "RANGE:[AAA,MMM)"}, + {"timestamp", "FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2024-02-01 00:00:00')", "RANGE:[2024-01-01 00:00:00,2024-02-01 00:00:00)"}, + {"timestamptz", "FOR VALUES FROM ('2024-01-01 00:00:00+08') TO ('2024-02-01 00:00:00+08')", "RANGE:[2024-01-01 00:00:00+08,2024-02-01 00:00:00+08)"}, + {"list text", "FOR VALUES IN ('beijing', 'shanghai')", "LIST:(beijing,shanghai)"}, + {"list text spaces", "FOR VALUES IN ('New York', 'Los Angeles')", "LIST:(Los Angeles,New York)"}, + {"list int", "FOR VALUES IN (1, 2, 3)", "LIST:(1,2,3)"}, + {"list boolean", "FOR VALUES IN (true)", "LIST:(true)"}, + {"list NULL", "FOR VALUES IN (NULL)", "LIST:(NULL)"}, + {"default", "DEFAULT", "DEFAULT"}, + {"MINVALUE", "FOR VALUES FROM (MINVALUE) TO (0)", "RANGE:[MINVALUE,0)"}, + {"MAXVALUE", "FOR VALUES FROM (100) TO (MAXVALUE)", "RANGE:[100,MAXVALUE)"}, + {"hash", "FOR VALUES WITH (modulus 3, remainder 0)", "HASH:3:0"}, + } + for _, c := range cases { + if got := normalizeBoundary(c.input); got != c.want { + t.Errorf("[%s] got %q, want %q\n input: %s", c.label, got, c.want, c.input) + } + } +} + +// ---- Real CBDB boundary strings (verbatim from pg_get_expr on CBDB 2.4.0) ---- +// Verified identical to GP7 output for all tested types. + +func TestNormalizeBoundary_RealCBDB_AllTypes(t *testing.T) { + cases := []struct { + label string + input string + want string + }{ + {"date", "FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')", "RANGE:[2024-01-01,2024-02-01)"}, + {"int4", "FOR VALUES FROM (0) TO (100)", "RANGE:[0,100)"}, + {"bigint", "FOR VALUES FROM ('0') TO ('100')", "RANGE:[0,100)"}, + {"numeric", "FOR VALUES FROM (0.0) TO (10.0)", "RANGE:[0.0,10.0)"}, + {"timestamp", "FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2024-02-01 00:00:00')", "RANGE:[2024-01-01 00:00:00,2024-02-01 00:00:00)"}, + {"timestamptz", "FOR VALUES FROM ('2024-01-01 00:00:00+08') TO ('2024-02-01 00:00:00+08')", "RANGE:[2024-01-01 00:00:00+08,2024-02-01 00:00:00+08)"}, + {"list text", "FOR VALUES IN ('beijing', 'shanghai')", "LIST:(beijing,shanghai)"}, + {"list int", "FOR VALUES IN (1, 2, 3)", "LIST:(1,2,3)"}, + {"default", "DEFAULT", "DEFAULT"}, + {"multi-col range", "FOR VALUES FROM (0, 0) TO (10, 100)", "RANGE:[0, 0,10, 100)"}, + } + for _, c := range cases { + if got := normalizeBoundary(c.input); got != c.want { + t.Errorf("[%s] got %q, want %q\n input: %s", c.label, got, c.want, c.input) + } + } +} + +// ---- Cross-version pairing: GP6 ↔ GP7/CBDB ---- + +func TestNormalizeBoundary_CrossVersion_AllTypes(t *testing.T) { + cases := []struct { + label string + gp6 string + gp7 string // GP7 = CBDB (verified identical) + }{ + {"date EVERY", "START ('2024-01-01'::date) END ('2024-02-01'::date) EVERY ('1 mon'::interval) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')"}, + {"date named", "PARTITION jan START ('2024-01-01'::date) END ('2024-02-01'::date) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')"}, + {"int4 EVERY", "START (0) END (100) EVERY (100) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM (0) TO (100)"}, + {"int4 named", "PARTITION p1 START (0) END (100) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM (0) TO (100)"}, + {"int4 negative", "PARTITION pn START ((-100)) END (0) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('-100') TO (0)"}, + {"bigint EVERY", "START (0::bigint) END (100::bigint) EVERY (100::bigint) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('0') TO ('100')"}, + {"bigint named", "PARTITION p1 START (0::bigint) END (100::bigint) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('0') TO ('100')"}, + {"bigint negative", "PARTITION pn START ((-9999999999)::bigint) END (0::bigint) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('-9999999999') TO ('0')"}, + {"smallint EVERY", "START (0::smallint) END (100::smallint) EVERY (100::smallint) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('0') TO ('100')"}, + {"numeric EVERY", "START (0.0) END (10.0) EVERY (10.0) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM (0.0) TO (10.0)"}, + {"numeric named", "PARTITION p1 START (0.0) END (10.0) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM (0.0) TO (10.0)"}, + {"real named", "PARTITION p1 START (0::real) END (10::real) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('0') TO ('10')"}, + {"double named", "PARTITION p1 START (0::double precision) END (100::double precision) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('0') TO ('100')"}, + {"text named", "PARTITION pa START ('A'::text) END ('M'::text) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('A') TO ('M')"}, + {"char(3) named", "PARTITION pa START ('AAA'::character(3)) END ('MMM'::character(3)) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('AAA') TO ('MMM')"}, + {"timestamp EVERY", "START ('2024-01-01 00:00:00'::timestamp without time zone) END ('2024-02-01 00:00:00'::timestamp without time zone) EVERY ('1 mon'::interval) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2024-02-01 00:00:00')"}, + {"timestamp named", "PARTITION jan START ('2024-01-01 00:00:00'::timestamp without time zone) END ('2024-02-01 00:00:00'::timestamp without time zone) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2024-02-01 00:00:00')"}, + {"timestamptz named", "PARTITION jan START ('2024-01-01 00:00:00+08'::timestamp with time zone) END ('2024-02-01 00:00:00+08'::timestamp with time zone) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES FROM ('2024-01-01 00:00:00+08') TO ('2024-02-01 00:00:00+08')"}, + {"list text", "PARTITION pa VALUES('beijing', 'shanghai') WITH (appendonly='true', blocksize='1048576')", "FOR VALUES IN ('beijing', 'shanghai')"}, + {"list text spaces", "PARTITION pa VALUES('New York', 'Los Angeles') WITH (appendonly='true', blocksize='1048576')", "FOR VALUES IN ('New York', 'Los Angeles')"}, + {"list int", "PARTITION active VALUES(1, 2, 3) WITH (appendonly='true', blocksize='1048576')", "FOR VALUES IN (1, 2, 3)"}, + {"list char", "PARTITION py VALUES('Y') WITH (appendonly='true', blocksize='1048576')", "FOR VALUES IN ('Y')"}, + {"default", "DEFAULT PARTITION pother WITH (appendonly='true', blocksize='1048576')", "DEFAULT"}, + } + for _, c := range cases { + g6 := normalizeBoundary(c.gp6) + g7 := normalizeBoundary(c.gp7) + if g6 != g7 { + t.Errorf("[%s] cross-version mismatch:\n GP6 %q -> %q\n GP7 %q -> %q", c.label, c.gp6, g6, c.gp7, g7) + } + } +} diff --git a/copy/copy.go b/copy/copy.go index 603e0c7..7080dff 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -122,6 +122,8 @@ func (app *Application) SetFlagDefaults(flagSet *pflag.FlagSet) { flagSet.Bool(option.COMPRESSION, false, "Enable compression for data transfer (master: snappy, segment: zstd by default)") flagSet.String(option.COMPRESS_TYPE, "zstd", "Compression algorithm for segment copy: \"gzip\", \"snappy\", or \"zstd\" (master copy always uses snappy)") flagSet.Int(option.ON_SEGMENT_THRESHOLD, 1000000, "Copy between Coordinators directly, if the table has smaller or same number of rows") + flagSet.Bool(option.COPY_ON_SEGMENT, false, "Force all tables to be copied ON SEGMENT, bypassing the --on-segment-threshold row-count check") + flagSet.Bool(option.ALLOW_PARTITION_RESHAPE, false, "Allow root->root fallback when source and destination partition structures differ (leaf count or boundary mismatch). Without this flag, structural mismatches are fatal.") flagSet.Bool(option.QUIET, false, "Suppress non-warning, non-error log messages") flagSet.String(option.SOURCE_HOST, "127.0.0.1", "The host of source cluster. Must be reachable from the destination cluster under --connection-mode pull.") flagSet.Int(option.SOURCE_PORT, 5432, "The port of source cluster") diff --git a/copy/copy_command.go b/copy/copy_command.go index d789b6f..67c0230 100644 --- a/copy/copy_command.go +++ b/copy/copy_command.go @@ -553,6 +553,7 @@ func createTestCopyStrategy(strategy string, workerId int, srcSegs []utils.Segme // - Segment copy (large tables): only uses zstd or gzip (default: zstd) func CreateCopyStrategy(isReplicated bool, numTuples int64, + forceOnSegment bool, workerId int, srcSegs []utils.SegmentHostInfo, destSegs []utils.SegmentHostInfo, @@ -572,7 +573,9 @@ func CreateCopyStrategy(isReplicated bool, return createTestCopyStrategy(strategy, workerId, srcSegs, destSegs, connectionMode, useCompression) } - if numTuples <= int64(utils.MustGetFlagInt(option.ON_SEGMENT_THRESHOLD)) { + globalForce := utils.MustGetFlagBool(option.COPY_ON_SEGMENT) + if !forceOnSegment && !globalForce && + numTuples <= int64(utils.MustGetFlagInt(option.ON_SEGMENT_THRESHOLD)) { compArg := getCompressArg(true, useCompression) return &CopyOnMaster{CopyBase: CopyBase{WorkerId: workerId, SrcSegmentsHostInfo: srcSegs, DestSegmentsHostInfo: destSegs, ConnectionMode: connectionMode, CompArg: compArg}} } diff --git a/copy/copy_manager.go b/copy/copy_manager.go index b573239..b847e22 100644 --- a/copy/copy_manager.go +++ b/copy/copy_manager.go @@ -184,6 +184,7 @@ func (tc *TableCopier) cleanupAfterCopy(isSkipped bool, inTxn bool, err error) { func (tc *TableCopier) copyData() error { command := CreateCopyStrategy(tc.srcTable.IsReplicated, tc.srcTable.RelTuples, + tc.srcTable.ForceOnSegment, tc.workerID, tc.manager.srcSegmentsHostInfo, tc.manager.destSegmentsIpInfo, diff --git a/copy/copy_metadata.go b/copy/copy_metadata.go index 895c173..3a39b70 100644 --- a/copy/copy_metadata.go +++ b/copy/copy_metadata.go @@ -191,10 +191,11 @@ func (m *MetadataManager) fillTablePairChan(srcTables, destTables []option.Table for i, t := range srcTables { tablec <- option.TablePair{ SrcTable: option.Table{ - Schema: t.Schema, - Name: t.Name, - RelTuples: t.RelTuples, - IsReplicated: t.IsReplicated, + Schema: t.Schema, + Name: t.Name, + RelTuples: t.RelTuples, + IsReplicated: t.IsReplicated, + ForceOnSegment: t.ForceOnSegment, }, DestTable: option.Table{ Schema: destTables[i].Schema, diff --git a/copy/copy_query.go b/copy/copy_query.go index 75ca90e..ed5d21f 100644 --- a/copy/copy_query.go +++ b/copy/copy_query.go @@ -236,6 +236,12 @@ type PartLeafTable struct { RootName string LeafName string RelTuples int64 + // Boundary is a name-independent, deterministic representation of the + // leaf's partition range/list bound. Two leaves with the same bound + // produce the same Boundary string regardless of their partition names, + // so it is used to pair source and destination leaves (never the + // _1_prt_N name suffix, which drifts after add/drop partition). + Boundary string } // GetPartitionLeafTables retrieves partition leaf tables from the database @@ -249,7 +255,8 @@ func (qm *QueryManager) GetPartitionLeafTables(conn *dbconn.DBConn) ([]PartLeafT SELECT quote_ident(n.nspname) || '.' || quote_ident(relname) AS rootname, quote_ident(n.nspname) || '.' || (SELECT quote_ident(relname) FROM pg_class WHERE oid = inhrelid ) - AS leafname, cast(reltuples as bigint) AS relTuples + AS leafname, cast(reltuples as bigint) AS relTuples, + coalesce((SELECT pg_get_expr(relpartbound, oid) FROM pg_class WHERE oid = inhrelid), '') AS boundary FROM pg_class c JOIN pg_inherits p ON c.oid = p.inhparent @@ -263,11 +270,12 @@ func (qm *QueryManager) GetPartitionLeafTables(conn *dbconn.DBConn) ([]PartLeafT ORDER BY n.nspname, leafname;` } else { query = ` - SELECT quote_ident(na.nspname) || '.' || quote_ident(cb.relname) AS rootname, quote_ident(pc.schema) ||'.' || quote_ident(pc.name) AS leafname, pc.relTuples + SELECT quote_ident(na.nspname) || '.' || quote_ident(cb.relname) AS rootname, quote_ident(pc.schema) ||'.' || quote_ident(pc.name) AS leafname, pc.relTuples, pc.boundary FROM pg_namespace na JOIN pg_class cb ON na.oid = cb.relnamespace JOIN (SELECT - p.parrelid AS parentid, n.nspname AS schema, cparent.relname AS name, cast(cparent.reltuples as bigint) AS relTuples + p.parrelid AS parentid, n.nspname AS schema, cparent.relname AS name, cast(cparent.reltuples as bigint) AS relTuples, + coalesce(pg_get_partition_rule_def(r.oid, true), '') AS boundary FROM pg_partition p JOIN pg_partition_rule r ON p.oid = r.paroid JOIN pg_class cparent ON cparent.oid = r.parchildrelid diff --git a/copy/copy_query_wrapper.go b/copy/copy_query_wrapper.go index cb64378..18346dd 100644 --- a/copy/copy_query_wrapper.go +++ b/copy/copy_query_wrapper.go @@ -459,8 +459,176 @@ func (qw *QueryWrapper) GetPartitionLeafTables(conn *dbconn.DBConn, isDest bool) return tables, nil } +// leavesByRoot returns the partition leaf tables whose root equals rootFqn. +func leavesByRoot(leaves []PartLeafTable, rootFqn string) []PartLeafTable { + results := make([]PartLeafTable, 0) + for _, l := range leaves { + if l.RootName == rootFqn { + results = append(results, l) + } + } + return results +} + +// PairAction describes the outcome of a leaf-pairing attempt. +type PairAction string + +const ( + PairFull PairAction = "full" // all src leaves matched — leaf-to-leaf + PairPartial PairAction = "partial" // some matched, some unmatched — needs allowReshape + PairRootFallback PairAction = "root_fallback" // zero matches + allowReshape — root→root + PairFatal PairAction = "fatal" // partial or zero matches without allowReshape +) + +// decidePairAction returns the action to take based on match counts and the +// --allow-partition-reshape flag. This is a pure function with no side effects, +// making it straightforward to test. +func decidePairAction(matchedCount, unmatchedCount int, allowReshape bool) PairAction { + if matchedCount == 0 { + if allowReshape { + return PairRootFallback + } + return PairFatal + } + if unmatchedCount > 0 { + if allowReshape { + return PairPartial + } + return PairFatal + } + return PairFull +} + +// matchLeavesByBoundary is the pure, DB-free matching core. It normalises +// boundary strings on both sides and pairs src leaves to dst leaves by equal +// normalised boundary. Leaves with empty, duplicate, or unmatched boundaries +// are returned in unmatchedSrc. +func matchLeavesByBoundary(srcLeaves, destLeaves []PartLeafTable) ( + matchedSrc, matchedDst []option.Table, unmatchedSrc []PartLeafTable, +) { + if len(srcLeaves) == 0 || len(destLeaves) == 0 { + return nil, nil, nil + } + + // Build destination lookup by normalised boundary. + destByNorm := make(map[string]PartLeafTable, len(destLeaves)) + for _, d := range destLeaves { + nb := normalizeBoundary(d.Boundary) + if nb == "" { + continue + } + if _, dup := destByNorm[nb]; dup { + delete(destByNorm, nb) + continue + } + destByNorm[nb] = d + } + + // Count normalised src boundaries to detect duplicates. + normCount := make(map[string]int, len(srcLeaves)) + for _, s := range srcLeaves { + normCount[normalizeBoundary(s.Boundary)]++ + } + + leafSrc := make([]option.Table, 0, len(srcLeaves)) + leafDst := make([]option.Table, 0, len(srcLeaves)) + unmatched := make([]PartLeafTable, 0) + + for _, s := range srcLeaves { + nb := normalizeBoundary(s.Boundary) + if nb == "" || normCount[nb] > 1 { + unmatched = append(unmatched, s) + continue + } + + d, found := destByNorm[nb] + if !found { + unmatched = append(unmatched, s) + continue + } + + ss := strings.Split(s.LeafName, ".") + ds := strings.Split(d.LeafName, ".") + if len(ss) != 2 || len(ds) != 2 { + unmatched = append(unmatched, s) + continue + } + + leafSrc = append(leafSrc, option.Table{Schema: ss[0], Name: ss[1], Partition: 0, RelTuples: s.RelTuples}) + leafDst = append(leafDst, option.Table{Schema: ds[0], Name: ds[1], Partition: 0, RelTuples: s.RelTuples}) + } + return leafSrc, leafDst, unmatched +} + +// pairPartitionLeaves orchestrates leaf pairing: fetches leaves from the DB, +// calls matchLeavesByBoundary for the pure matching, then applies the +// decidePairAction gate to determine whether the copy should proceed, fall +// back, or abort. +func (qw *QueryWrapper) pairPartitionLeaves(srcConn, destConn *dbconn.DBConn, + srcRoot option.Table, srcRootFqn, destRootFqn string, allowReshape bool, +) (matchedSrc, matchedDst []option.Table, unmatchedSrc []PartLeafTable, rootFallback bool) { + + srcLeavesAll, err := qw.GetPartitionLeafTables(srcConn, false) + if err != nil { + gplog.Fatal(errors.Errorf("leaf pairing for %q: cannot read source leaves: %v", srcRootFqn, err), "") + } + destLeavesAll, err := qw.GetPartitionLeafTables(destConn, true) + if err != nil { + gplog.Fatal(errors.Errorf("leaf pairing for %q: cannot read dest leaves: %v", destRootFqn, err), "") + } + + srcLeaves := leavesByRoot(srcLeavesAll, srcRootFqn) + destLeaves := leavesByRoot(destLeavesAll, destRootFqn) + + if len(srcLeaves) == 0 { + gplog.Warn("leaf pairing for %q->%q: src has no partition leaves; falling back to root->root", + srcRootFqn, destRootFqn) + return nil, nil, nil, true + } + if len(destLeaves) == 0 { + gplog.Warn("leaf pairing for %q->%q: destination has no partition leaves (plain table); "+ + "falling back to root->root (data will be flattened into a single table)", + srcRootFqn, destRootFqn) + return nil, nil, nil, true + } + + leafSrc, leafDst, unmatched := matchLeavesByBoundary(srcLeaves, destLeaves) + + action := decidePairAction(len(leafSrc), len(unmatched), allowReshape) + switch action { + case PairFatal: + if len(leafSrc) == 0 { + gplog.Fatal(errors.Errorf("leaf pairing for %q->%q: no boundary matches found (boundary format may differ across versions). "+ + "Use --allow-partition-reshape to allow root->root fallback.", srcRootFqn, destRootFqn), "") + } else { + gplog.Fatal(errors.Errorf("leaf pairing for %q->%q: %d of %d src leaves could not be matched by boundary. "+ + "Use --allow-partition-reshape to allow partial leaf-to-leaf copy with unmatched leaves falling back to dst root.", + srcRootFqn, destRootFqn, len(unmatched), len(srcLeaves)), "") + } + case PairRootFallback: + gplog.Warn("leaf pairing for %q->%q: no boundary matches found; falling back to root->root ON SEGMENT (per --allow-partition-reshape)", + srcRootFqn, destRootFqn) + return nil, nil, nil, true + case PairPartial: + gplog.Warn("leaf pairing for %q->%q: %d of %d src leaves could not be matched by boundary; "+ + "unmatched leaves will be copied individually to dst root (per --allow-partition-reshape). "+ + "Ensure dst has a DEFAULT partition or that all unmatched values fall within existing dst ranges.", + srcRootFqn, destRootFqn, len(unmatched), len(srcLeaves)) + case PairFull: + gplog.Info("partition %q->%q: all %d leaves matched by boundary (leaf-to-leaf)", srcRootFqn, destRootFqn, len(leafSrc)) + } + + for i := range leafSrc { + gplog.Debug("leaf pair by boundary: %q.%q -> %q.%q (reltuples=%v)", + leafSrc[i].Schema, leafSrc[i].Name, leafDst[i].Schema, leafDst[i].Name, leafSrc[i].RelTuples) + } + + return leafSrc, leafDst, unmatched, false +} + // excludeTablePair excludes table pairs based on source and destination tables -func (qw *QueryWrapper) excludeTablePair(srcTables, destTables, exclTables []option.Table, +func (qw *QueryWrapper) excludeTablePair(srcConn, destConn *dbconn.DBConn, + srcTables, destTables, exclTables []option.Table, userTables map[string]option.TableStatistics, dbname string) ([]option.Table, []option.Table) { if len(srcTables) != len(destTables) { @@ -471,6 +639,13 @@ func (qw *QueryWrapper) excludeTablePair(srcTables, destTables, exclTables []opt excludedDstTabs := make([]option.Table, 0) tabMap := make(map[string]string) + // Build source table info map for partition root lookup + srcTabInfo := make(map[string]option.Table) + for _, t := range srcTables { + k := t.Schema + "." + t.Name + srcTabInfo[k] = t + } + // Build table mapping for i, t := range srcTables { src := t.Schema + "." + t.Name @@ -488,6 +663,63 @@ func (qw *QueryWrapper) excludeTablePair(srcTables, destTables, exclTables []opt for k, v := range tabMap { u, exists := userTables[k] if !exists { + // Check if it's a partition root table + if srcTab, ok := srcTabInfo[k]; ok && srcTab.Partition == 1 { + sls := strings.Split(k, ".") + sld := strings.Split(v, ".") + + allowReshape := utils.MustGetFlagBool(option.ALLOW_PARTITION_RESHAPE) + matchedSrc, matchedDst, unmatchedLeaves, rootFallback := qw.pairPartitionLeaves(srcConn, destConn, srcTab, k, v, allowReshape) + + if rootFallback { + // Full root->root (plain dest, zero matches, or read error). + gplog.Warn("mapping partition root %q to %q as root->root ON SEGMENT; per-leaf parallelism is lost", k, v) + excludedSrcTabs = append(excludedSrcTabs, option.Table{ + Schema: sls[0], + Name: sls[1], + Partition: 1, + RelTuples: srcTab.RelTuples, + ForceOnSegment: true, + }) + excludedDstTabs = append(excludedDstTabs, option.Table{ + Schema: sld[0], + Name: sld[1], + Partition: 0, + RelTuples: srcTab.RelTuples, + }) + continue + } + + // Add matched leaf pairs (leaf-to-leaf). + excludedSrcTabs = append(excludedSrcTabs, matchedSrc...) + excludedDstTabs = append(excludedDstTabs, matchedDst...) + gplog.Info("mapping partition root %q to %q: %d leaf->leaf pair(s) by boundary", k, v, len(matchedSrc)) + + // Add unmatched src leaves as individual copies to dst root. + for _, ul := range unmatchedLeaves { + uls := strings.Split(ul.LeafName, ".") + if len(uls) != 2 { + gplog.Warn("unexpected leaf name format %q; skipping unmatched leaf", ul.LeafName) + continue + } + gplog.Warn("unmatched src leaf %q will be copied individually to dst root %q", ul.LeafName, v) + excludedSrcTabs = append(excludedSrcTabs, option.Table{ + Schema: uls[0], + Name: uls[1], + Partition: 0, + RelTuples: ul.RelTuples, + ForceOnSegment: true, + }) + excludedDstTabs = append(excludedDstTabs, option.Table{ + Schema: sld[0], + Name: sld[1], + Partition: 0, + RelTuples: ul.RelTuples, + }) + } + continue + } + gplog.Warn("Relation \"%v\" does not exists on source database \"%v\"", k, dbname) continue } @@ -571,6 +803,7 @@ func (qw *QueryWrapper) processDestinationTables(srcConn, destConn *dbconn.DBCon config.ValidateDestTables(destTables, destConn.DBName) excludedSrcTabs, excludedDstTabs := qw.excludeTablePair( + srcConn, destConn, config.GetIncludeTablesByDb(srcConn.DBName), config.GetDestTablesByDb(destConn.DBName), config.GetExclTablesByDb(srcConn.DBName), diff --git a/copy/copy_query_wrapper_test.go b/copy/copy_query_wrapper_test.go new file mode 100644 index 0000000..b8b7d4c --- /dev/null +++ b/copy/copy_query_wrapper_test.go @@ -0,0 +1,340 @@ +package copy + +import ( + "testing" +) + +// ---- leavesByRoot ---- + +func TestLeavesByRoot_ReturnsMatchingLeaves(t *testing.T) { + leaves := []PartLeafTable{ + {RootName: "public.sales", LeafName: "public.sales_1_prt_q1", Boundary: "b1"}, + {RootName: "public.sales", LeafName: "public.sales_1_prt_q2", Boundary: "b2"}, + {RootName: "public.orders", LeafName: "public.orders_1_prt_q1", Boundary: "b1"}, + } + got := leavesByRoot(leaves, "public.sales") + if len(got) != 2 { + t.Fatalf("expected 2 leaves, got %d", len(got)) + } + for _, l := range got { + if l.RootName != "public.sales" { + t.Errorf("unexpected root %q", l.RootName) + } + } +} + +func TestLeavesByRoot_NoMatch(t *testing.T) { + leaves := []PartLeafTable{ + {RootName: "public.orders", LeafName: "public.orders_1_prt_q1", Boundary: "b1"}, + } + got := leavesByRoot(leaves, "public.sales") + if len(got) != 0 { + t.Fatalf("expected 0 leaves, got %d", len(got)) + } +} + +func TestLeavesByRoot_EmptyInput(t *testing.T) { + got := leavesByRoot(nil, "public.sales") + if len(got) != 0 { + t.Fatalf("expected 0 leaves, got %d", len(got)) + } +} + +// ---- helpers ---- + +func makeLeaves(root string, specs [][2]string) []PartLeafTable { + out := make([]PartLeafTable, len(specs)) + for i, s := range specs { + out[i] = PartLeafTable{RootName: root, LeafName: s[0], Boundary: s[1]} + } + return out +} + +// ---- matchLeavesByBoundary: same-version matching ---- + +func TestMatchLeaves_SuccessExactMatch(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", "FOR VALUES FROM ('2024-01-01') TO ('2024-04-01')"}, + {"public.sales_q2", "FOR VALUES FROM ('2024-04-01') TO ('2024-07-01')"}, + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h2", "FOR VALUES FROM ('2024-04-01') TO ('2024-07-01')"}, + {"public.sales_h1", "FOR VALUES FROM ('2024-01-01') TO ('2024-04-01')"}, + }) + + matchedSrc, matchedDst, unmatched := matchLeavesByBoundary(src, dst) + if len(unmatched) != 0 { + t.Fatalf("expected 0 unmatched, got %d", len(unmatched)) + } + if len(matchedSrc) != 2 || len(matchedDst) != 2 { + t.Fatalf("expected 2 pairs, got src=%d dst=%d", len(matchedSrc), len(matchedDst)) + } + if matchedSrc[0].Name != "sales_q1" { + t.Errorf("unexpected src[0] name %q", matchedSrc[0].Name) + } + if matchedDst[0].Name != "sales_h1" { + t.Errorf("unexpected dst[0] name %q (want sales_h1, boundary-matched)", matchedDst[0].Name) + } +} + +// ---- matchLeavesByBoundary: partial match cases ---- + +func TestMatchLeaves_LeafCountMismatch_PartialMatch(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", "b1"}, + {"public.sales_q2", "b2"}, + {"public.sales_q3", "b3"}, + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h1", "b1"}, + {"public.sales_h2", "b2"}, + }) + + matchedSrc, matchedDst, unmatched := matchLeavesByBoundary(src, dst) + if len(matchedSrc) != 2 || len(matchedDst) != 2 { + t.Fatalf("expected 2 matched pairs, got %d", len(matchedSrc)) + } + if len(unmatched) != 1 { + t.Fatalf("expected 1 unmatched leaf, got %d", len(unmatched)) + } + if unmatched[0].LeafName != "public.sales_q3" { + t.Errorf("expected unmatched leaf sales_q3, got %q", unmatched[0].LeafName) + } +} + +func TestMatchLeaves_NoBoundaryMatchForSrcLeaf_PartialMatch(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", "b1"}, + {"public.sales_q2", "b_no_match"}, + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h1", "b1"}, + {"public.sales_h2", "b2"}, + }) + + matchedSrc, _, unmatched := matchLeavesByBoundary(src, dst) + if len(matchedSrc) != 1 { + t.Fatalf("expected 1 matched pair, got %d", len(matchedSrc)) + } + if len(unmatched) != 1 { + t.Fatalf("expected 1 unmatched, got %d", len(unmatched)) + } + if unmatched[0].LeafName != "public.sales_q2" { + t.Errorf("unexpected unmatched leaf %q", unmatched[0].LeafName) + } +} + +func TestMatchLeaves_EmptySrcBoundary_TreatedAsUnmatched(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", ""}, + {"public.sales_q2", "b2"}, + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h1", "b1"}, + {"public.sales_h2", "b2"}, + }) + + matchedSrc, _, unmatched := matchLeavesByBoundary(src, dst) + if len(matchedSrc) != 1 { + t.Fatalf("expected 1 matched pair, got %d", len(matchedSrc)) + } + if len(unmatched) != 1 || unmatched[0].LeafName != "public.sales_q1" { + t.Fatalf("expected sales_q1 as unmatched, got %v", unmatched) + } +} + +func TestMatchLeaves_EmptyDestBoundary_SrcBecomesUnmatched(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", "b1"}, + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h1", ""}, + }) + + matchedSrc, _, unmatched := matchLeavesByBoundary(src, dst) + if len(matchedSrc) != 0 { + t.Fatalf("expected 0 matched pairs, got %d", len(matchedSrc)) + } + if len(unmatched) != 1 { + t.Fatalf("expected 1 unmatched leaf, got %d", len(unmatched)) + } +} + +func TestMatchLeaves_DuplicateSrcBoundary_BothUnmatched(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", "b1"}, + {"public.sales_q2", "b1"}, // duplicate + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h1", "b1"}, + {"public.sales_h2", "b2"}, + }) + + matchedSrc, _, unmatched := matchLeavesByBoundary(src, dst) + if len(matchedSrc) != 0 { + t.Fatalf("expected 0 matched pairs (duplicate src boundary), got %d", len(matchedSrc)) + } + if len(unmatched) != 2 { + t.Fatalf("expected 2 unmatched leaves, got %d", len(unmatched)) + } +} + +func TestMatchLeaves_DuplicateDestBoundary_SrcUnmatched(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", "b1"}, + {"public.sales_q2", "b2"}, + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h1", "b1"}, + {"public.sales_h2", "b1"}, // duplicate + }) + + matchedSrc, _, unmatched := matchLeavesByBoundary(src, dst) + if len(matchedSrc) != 0 { + t.Fatalf("expected 0 matched pairs, got %d", len(matchedSrc)) + } + if len(unmatched) != 2 { + t.Fatalf("expected 2 unmatched, got %d", len(unmatched)) + } +} + +func TestMatchLeaves_DestIsPlainTable_ReturnsNil(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", "b1"}, + }) + matchedSrc, matchedDst, unmatched := matchLeavesByBoundary(src, nil) + if matchedSrc != nil || matchedDst != nil || unmatched != nil { + t.Fatal("expected all nil when dest has no leaves") + } +} + +func TestMatchLeaves_SrcHasNoLeaves_ReturnsNil(t *testing.T) { + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h1", "b1"}, + }) + matchedSrc, matchedDst, unmatched := matchLeavesByBoundary(nil, dst) + if matchedSrc != nil || matchedDst != nil || unmatched != nil { + t.Fatal("expected all nil when src has no leaves") + } +} + +func TestMatchLeaves_RelTuplesPreserved(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", "b1"}, + }) + src[0].RelTuples = 500000 + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h1", "b1"}, + }) + + matchedSrc, _, unmatched := matchLeavesByBoundary(src, dst) + if len(unmatched) != 0 { + t.Fatalf("expected 0 unmatched, got %d", len(unmatched)) + } + if matchedSrc[0].RelTuples != 500000 { + t.Errorf("RelTuples not preserved: got %d", matchedSrc[0].RelTuples) + } +} + +// ---- matchLeavesByBoundary: cross-version boundary normalisation ---- + +func TestMatchLeaves_CrossVersion_GP6toGP7_Range(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_q1", "PARTITION p1 START ('2024-01-01'::date) END ('2024-02-01'::date) WITH (appendonly='true')"}, + {"public.sales_q2", "PARTITION p2 START ('2024-02-01'::date) END ('2024-03-01'::date) WITH (appendonly='true')"}, + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_h2", "FOR VALUES FROM ('2024-02-01') TO ('2024-03-01')"}, + {"public.sales_h1", "FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')"}, + }) + + matchedSrc, matchedDst, unmatched := matchLeavesByBoundary(src, dst) + if len(unmatched) != 0 { + t.Fatalf("expected 0 unmatched in cross-version match, got %d", len(unmatched)) + } + if len(matchedSrc) != 2 || len(matchedDst) != 2 { + t.Fatalf("expected 2 pairs, got src=%d dst=%d", len(matchedSrc), len(matchedDst)) + } +} + +func TestMatchLeaves_CrossVersion_GP6toGP7_Default(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_def", "DEFAULT PARTITION pother WITH (appendonly='true')"}, + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_default", "DEFAULT"}, + }) + + matchedSrc, _, unmatched := matchLeavesByBoundary(src, dst) + if len(unmatched) != 0 { + t.Fatalf("expected DEFAULT to match cross-version, got %d unmatched", len(unmatched)) + } + if len(matchedSrc) != 1 { + t.Fatalf("expected 1 matched pair, got %d", len(matchedSrc)) + } +} + +func TestMatchLeaves_CrossVersion_GP6toGP7_List(t *testing.T) { + src := makeLeaves("public.sales", [][2]string{ + {"public.sales_l1", "PARTITION pa VALUES('apple', 'banana') WITH (appendonly='true')"}, + }) + dst := makeLeaves("public.sales", [][2]string{ + {"public.sales_list1", "FOR VALUES IN ('apple', 'banana')"}, + }) + + matchedSrc, _, unmatched := matchLeavesByBoundary(src, dst) + if len(unmatched) != 0 { + t.Fatalf("expected LIST to match cross-version, got %d unmatched", len(unmatched)) + } + if len(matchedSrc) != 1 { + t.Fatalf("expected 1 matched pair, got %d", len(matchedSrc)) + } +} + +// ---- decidePairAction ---- + +func TestDecidePairAction_FullMatch(t *testing.T) { + if got := decidePairAction(4, 0, false); got != PairFull { + t.Errorf("full match: expected %q, got %q", PairFull, got) + } + if got := decidePairAction(4, 0, true); got != PairFull { + t.Errorf("full match with flag: expected %q, got %q", PairFull, got) + } +} + +func TestDecidePairAction_PartialMatch_WithFlag(t *testing.T) { + if got := decidePairAction(2, 2, true); got != PairPartial { + t.Errorf("partial+flag: expected %q, got %q", PairPartial, got) + } +} + +func TestDecidePairAction_PartialMatch_NoFlag(t *testing.T) { + if got := decidePairAction(2, 2, false); got != PairFatal { + t.Errorf("partial+no flag: expected %q, got %q", PairFatal, got) + } +} + +func TestDecidePairAction_ZeroMatch_WithFlag(t *testing.T) { + if got := decidePairAction(0, 4, true); got != PairRootFallback { + t.Errorf("zero+flag: expected %q, got %q", PairRootFallback, got) + } +} + +func TestDecidePairAction_ZeroMatch_NoFlag(t *testing.T) { + if got := decidePairAction(0, 4, false); got != PairFatal { + t.Errorf("zero+no flag: expected %q, got %q", PairFatal, got) + } +} + +func TestDecidePairAction_ZeroMatchZeroUnmatched_WithFlag(t *testing.T) { + // Edge case: both sides empty after filtering (e.g. all boundaries duplicated) + if got := decidePairAction(0, 0, true); got != PairRootFallback { + t.Errorf("zero/zero+flag: expected %q, got %q", PairRootFallback, got) + } +} + +func TestDecidePairAction_ZeroMatchZeroUnmatched_NoFlag(t *testing.T) { + if got := decidePairAction(0, 0, false); got != PairFatal { + t.Errorf("zero/zero+no flag: expected %q, got %q", PairFatal, got) + } +} diff --git a/option/option.go b/option/option.go index a831844..a842c25 100644 --- a/option/option.go +++ b/option/option.go @@ -36,6 +36,8 @@ const ( COMPRESSION = "compression" COMPRESS_TYPE = "compress-type" ON_SEGMENT_THRESHOLD = "on-segment-threshold" + COPY_ON_SEGMENT = "copy-on-segment" + ALLOW_PARTITION_RESHAPE = "allow-partition-reshape" QUIET = "quiet" SOURCE_HOST = "source-host" SOURCE_PORT = "source-port" @@ -90,6 +92,11 @@ type Table struct { Partition int RelTuples int64 IsReplicated bool + + // ForceOnSegment skips the row-count threshold check and forces the table + // to be copied ON SEGMENT (used for partition root tables and when the user + // passes --copy-on-segment). + ForceOnSegment bool } type TablePair struct { @@ -572,14 +579,16 @@ func (o Option) GetTablespaceMap() map[string]string { func (o Option) validatePartTables(title string, tables []*DbTable, userTables map[string]TableStatistics, dbname string) { for _, t := range tables { if t.Partition == 1 { - gplog.Fatal(errors.Errorf("Found partition root table: %s.%s.%s in %s list", dbname, t.Schema, t.Name, title), "") + // Partition root tables are allowed; they will be expanded to child partitions later. + continue } k := t.Schema + "." + t.Name _, exists := userTables[k] if !exists { - gplog.Fatal(errors.Errorf("%v \"%v\" does not exists on \"%v\" database", title, k, dbname), "") + gplog.Fatal(errors.Errorf("%v \"%v\" does not exist on \"%v\" database. "+ + "If this is a partition leaf or intermediate partition level, specify the root partition table instead.", title, k, dbname), "") } } } @@ -593,7 +602,20 @@ func (o Option) ValidateExcludeTables(userTables map[string]TableStatistics, dbn } func (o Option) ValidateDestTables(userTables map[string]TableStatistics, dbname string) { - o.validatePartTables("dest table", o.destTables, userTables, dbname) + for _, t := range o.destTables { + if t.Partition == 1 { + // Partition root tables are allowed as dest tables; + // COPY INTO a partitioned table routes data to child partitions. + continue + } + + k := t.Schema + "." + t.Name + + _, exists := userTables[k] + if !exists { + gplog.Fatal(errors.Errorf("dest table \"%v\" does not exist on \"%v\" database", k, dbname), "") + } + } } func MakeIncludeOptions(initialFlags *pflag.FlagSet, testTableName string) {