Skip to content

Commit 06727f2

Browse files
committed
fix: use the correct schema for previously initialized tables on new connections to the database
1 parent f635a21 commit 06727f2

12 files changed

Lines changed: 402 additions & 299 deletions

src/cloudsync.c

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ struct cloudsync_table_context {
167167
#endif
168168

169169
char **pk_name; // array of primary key names
170-
170+
171171
// precompiled statements
172172
dbvm_t *meta_pkexists_stmt; // check if a primary key already exist in the augmented table
173173
dbvm_t *meta_sentinel_update_stmt; // update a local sentinel row
@@ -573,6 +573,13 @@ const char *cloudsync_schema (cloudsync_context *data) {
573573
return data->current_schema;
574574
}
575575

576+
const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
577+
cloudsync_table_context *table = table_lookup(data, table_name);
578+
if (!table) return NULL;
579+
580+
return table->schema;
581+
}
582+
576583
// MARK: - Table Utils -
577584

578585
void table_pknames_free (char **names, int nrows) {
@@ -589,7 +596,7 @@ char *table_build_mergedelete_sql (cloudsync_table_context *table) {
589596
}
590597
#endif
591598

592-
return sql_build_delete_by_pk(table->context, table->name);
599+
return sql_build_delete_by_pk(table->context, table->name, table->schema);
593600
}
594601

595602
char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
@@ -610,9 +617,9 @@ char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *c
610617

611618
if (colname == NULL) {
612619
// is sentinel insert
613-
sql = sql_build_insert_pk_ignore(table->context, table->name);
620+
sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
614621
} else {
615-
sql = sql_build_upsert_pk_and_col(table->context, table->name, colname);
622+
sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
616623
}
617624
return sql;
618625
}
@@ -627,7 +634,7 @@ char *table_build_value_sql (cloudsync_table_context *table, const char *colname
627634
#endif
628635

629636
// SELECT age FROM customers WHERE first_name=? AND last_name=?;
630-
return sql_build_select_cols_by_pk(table->context, table->name, colname);
637+
return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
631638
}
632639

633640
cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
@@ -639,7 +646,18 @@ cloudsync_table_context *table_create (cloudsync_context *data, const char *name
639646
table->context = data;
640647
table->algo = algo;
641648
table->name = cloudsync_string_dup_lowercase(name);
642-
table->schema = (data->current_schema) ? cloudsync_string_dup(data->current_schema) : NULL;
649+
650+
// Detect schema from metadata table location. If metadata table doesn't
651+
// exist yet (during initialization), fall back to cloudsync_schema() which
652+
// returns the explicitly set schema or current_schema().
653+
table->schema = database_table_schema(name);
654+
if (!table->schema) {
655+
const char *fallback_schema = cloudsync_schema(data);
656+
if (fallback_schema) {
657+
table->schema = cloudsync_string_dup(fallback_schema);
658+
}
659+
}
660+
643661
if (!table->name) {
644662
cloudsync_memory_free(table);
645663
return NULL;
@@ -827,7 +845,7 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
827845

828846
// precompile the get column value statement
829847
if (ncols > 0) {
830-
sql = sql_build_select_nonpk_by_pk(data, table->name);
848+
sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
831849
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
832850
DEBUG_SQL("real_col_values_stmt: %s", sql);
833851

@@ -954,8 +972,14 @@ bool table_ensure_capacity (cloudsync_context *data) {
954972

955973
bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
956974
DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
957-
958-
// check if table is already in the global context and in that case just return
975+
976+
// Check if table already initialized in this connection's context.
977+
// Note: This prevents same-connection duplicate initialization.
978+
// SQLite clients cannot distinguish schemas, so having 'public.users'
979+
// and 'auth.users' would cause sync ambiguity. Users should avoid
980+
// initializing tables with the same name in different schemas.
981+
// If two concurrent connections initialize tables with the same name
982+
// in different schemas, the behavior is undefined.
959983
cloudsync_table_context *table = table_lookup(data, table_name);
960984
if (table) return true;
961985

@@ -967,7 +991,7 @@ bool table_add_to_context (cloudsync_context *data, table_algo algo, const char
967991
if (!table) return false;
968992

969993
// fill remaining metadata in the table
970-
int count = database_count_pk(data, table_name, false);
994+
int count = database_count_pk(data, table_name, false, table->schema);
971995
if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
972996
table->npks = count;
973997
if (table->npks == 0) {
@@ -979,7 +1003,7 @@ bool table_add_to_context (cloudsync_context *data, table_algo algo, const char
9791003
#endif
9801004
}
9811005

982-
int ncols = database_count_nonpk(data, table_name);
1006+
int ncols = database_count_nonpk(data, table_name, table->schema);
9831007
if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
9841008
int rc = table_add_stmts(table, ncols);
9851009
if (rc != DBRES_OK) goto abort_add_table;
@@ -997,8 +1021,11 @@ bool table_add_to_context (cloudsync_context *data, table_algo algo, const char
9971021

9981022
table->col_value_stmt = (dbvm_t **)cloudsync_memory_alloc((uint64_t)(sizeof(void *) * ncols));
9991023
if (!table->col_value_stmt) goto abort_add_table;
1000-
1001-
char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID, table_name, table_name);
1024+
1025+
// Pass empty string when schema is NULL; SQL will fall back to current_schema()
1026+
const char *schema = table->schema ? table->schema : "";
1027+
char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
1028+
table_name, schema, table_name, schema);
10021029
if (!sql) goto abort_add_table;
10031030
rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
10041031
cloudsync_memory_free(sql);
@@ -1678,7 +1705,8 @@ int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *
16781705
} else {
16791706
// compact meta-table
16801707
// delete entries for removed columns
1681-
char *sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_COLS_NOT_IN_SCHEMA_OR_PKCOL, table->meta_ref, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
1708+
const char *schema = table->schema ? table->schema : "";
1709+
char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
16821710
rc = database_exec(data, sql);
16831711
cloudsync_memory_free(sql);
16841712
if (rc != DBRES_OK) {
@@ -1688,7 +1716,7 @@ int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *
16881716

16891717
char buffer[1024];
16901718
char *singlequote_escaped_table_name = sql_escape_name(table->name, buffer, sizeof(buffer));
1691-
sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_PK_QUALIFIED_COLLIST_FMT, singlequote_escaped_table_name, singlequote_escaped_table_name);
1719+
sql = sql_build_pk_qualified_collist_query(schema, singlequote_escaped_table_name);
16921720
if (!sql) {rc = DBRES_NOMEM; goto finalize;}
16931721

16941722
char *pkclause = NULL;
@@ -1775,15 +1803,16 @@ int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name)
17751803
dbvm_t *vm = NULL;
17761804
int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
17771805
char *pkdecode = NULL;
1778-
1779-
char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_PK_COLLIST, table_name);
1806+
1807+
const char *schema = table->schema ? table->schema : "";
1808+
char *sql = sql_build_pk_collist_query(schema, table_name);
17801809
char *pkclause_identifiers = NULL;
17811810
int rc = database_select_text(data, sql, &pkclause_identifiers);
17821811
cloudsync_memory_free(sql);
17831812
if (rc != DBRES_OK) goto finalize;
17841813
char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
1785-
1786-
sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_PK_DECODE_SELECTLIST, table_name);
1814+
1815+
sql = sql_build_pk_decode_selectlist_query(schema, table_name);
17871816
rc = database_select_text(data, sql, &pkdecode);
17881817
cloudsync_memory_free(sql);
17891818
if (rc != DBRES_OK) goto finalize;
@@ -2504,14 +2533,18 @@ int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, boo
25042533
return cloudsync_set_error(data, buffer, DBRES_ERROR);
25052534
}
25062535

2536+
// check if already initialized
2537+
cloudsync_table_context *table = table_lookup(data, name);
2538+
if (table) return DBRES_OK;
2539+
25072540
// check if table exists
2508-
if (database_table_exists(data, name) == false) {
2541+
if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
25092542
snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
25102543
return cloudsync_set_error(data, buffer, DBRES_ERROR);
25112544
}
25122545

25132546
// no more than 128 columns can be used as a composite primary key (SQLite hard limit)
2514-
int npri_keys = database_count_pk(data, name, false);
2547+
int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
25152548
if (npri_keys < 0) return cloudsync_set_dberror(data);
25162549
if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
25172550

@@ -2528,7 +2561,7 @@ int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, boo
25282561
// the affinity of a column is determined by the declared type of the column,
25292562
// according to the following rules in the order shown:
25302563
// 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
2531-
int npri_keys_int = database_count_int_pk(data, name);
2564+
int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
25322565
if (npri_keys_int < 0) return cloudsync_set_dberror(data);
25332566
if (npri_keys == npri_keys_int) {
25342567
snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
@@ -2540,7 +2573,7 @@ int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, boo
25402573

25412574
// if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
25422575
if (npri_keys > 0) {
2543-
int npri_keys_notnull = database_count_pk(data, name, true);
2576+
int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
25442577
if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
25452578
if (npri_keys != npri_keys_notnull) {
25462579
snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
@@ -2550,7 +2583,7 @@ int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, boo
25502583

25512584
// check for columns declared as NOT NULL without a DEFAULT value.
25522585
// Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
2553-
int n_notnull_nodefault = database_count_notnull_without_default(data, name);
2586+
int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
25542587
if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
25552588
if (n_notnull_nodefault > 0) {
25562589
snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);

src/cloudsync.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
extern "C" {
1818
#endif
1919

20-
#define CLOUDSYNC_VERSION "0.9.82"
20+
#define CLOUDSYNC_VERSION "0.9.83"
2121
#define CLOUDSYNC_MAX_TABLENAME_LEN 512
2222

2323
#define CLOUDSYNC_VALUE_NOTSET -1
@@ -81,6 +81,7 @@ int cloudsync_commit_hook (void *ctx);
8181
void cloudsync_rollback_hook (void *ctx);
8282
void cloudsync_set_schema (cloudsync_context *data, const char *schema);
8383
const char *cloudsync_schema (cloudsync_context *data);
84+
const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name);
8485

8586
// Payload
8687
int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *nrows);

src/database.h

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ int database_select_text (cloudsync_context *data, const char *sql, char **valu
6666
int database_select_blob (cloudsync_context *data, const char *sql, char **value, int64_t *value_len);
6767
int database_select_blob_2int (cloudsync_context *data, const char *sql, char **value, int64_t *value_len, int64_t *value2, int64_t *value3);
6868
int database_write (cloudsync_context *data, const char *sql, const char **values, DBTYPE types[], int lens[], int count);
69-
bool database_table_exists (cloudsync_context *data, const char *table_name);
69+
bool database_table_exists (cloudsync_context *data, const char *table_name, const char *schema);
7070
bool database_internal_table_exists (cloudsync_context *data, const char *name);
7171
bool database_trigger_exists (cloudsync_context *data, const char *table_name);
7272
int database_create_metatable (cloudsync_context *data, const char *table_name);
@@ -75,10 +75,10 @@ int database_delete_triggers (cloudsync_context *data, const char *table_name);
7575
int database_pk_names (cloudsync_context *data, const char *table_name, char ***names, int *count);
7676
int database_cleanup (cloudsync_context *data);
7777

78-
int database_count_pk (cloudsync_context *data, const char *table_name, bool not_null);
79-
int database_count_nonpk (cloudsync_context *data, const char *table_name);
80-
int database_count_int_pk (cloudsync_context *data, const char *table_name);
81-
int database_count_notnull_without_default (cloudsync_context *data, const char *table_name);
78+
int database_count_pk (cloudsync_context *data, const char *table_name, bool not_null, const char *schema);
79+
int database_count_nonpk (cloudsync_context *data, const char *table_name, const char *schema);
80+
int database_count_int_pk (cloudsync_context *data, const char *table_name, const char *schema);
81+
int database_count_notnull_without_default (cloudsync_context *data, const char *table_name, const char *schema);
8282

8383
int64_t database_schema_version (cloudsync_context *data);
8484
uint64_t database_schema_hash (cloudsync_context *data);
@@ -139,13 +139,18 @@ uint64_t dbmem_size (void *ptr);
139139
// SQL
140140
char *sql_build_drop_table (const char *table_name, char *buffer, int bsize, bool is_meta);
141141
char *sql_escape_name (const char *name, char *buffer, size_t bsize);
142-
char *sql_build_select_nonpk_by_pk (cloudsync_context *data, const char *table_name);
143-
char *sql_build_delete_by_pk (cloudsync_context *data, const char *table_name);
144-
char *sql_build_insert_pk_ignore (cloudsync_context *data, const char *table_name);
145-
char *sql_build_upsert_pk_and_col (cloudsync_context *data, const char *table_name, const char *colname);
146-
char *sql_build_select_cols_by_pk (cloudsync_context *data, const char *table_name, const char *colname);
142+
char *sql_build_select_nonpk_by_pk (cloudsync_context *data, const char *table_name, const char *schema);
143+
char *sql_build_delete_by_pk (cloudsync_context *data, const char *table_name, const char *schema);
144+
char *sql_build_insert_pk_ignore (cloudsync_context *data, const char *table_name, const char *schema);
145+
char *sql_build_upsert_pk_and_col (cloudsync_context *data, const char *table_name, const char *colname, const char *schema);
146+
char *sql_build_select_cols_by_pk (cloudsync_context *data, const char *table_name, const char *colname, const char *schema);
147147
char *sql_build_rekey_pk_and_reset_version_except_col (cloudsync_context *data, const char *table_name, const char *except_col);
148+
char *sql_build_delete_cols_not_in_schema_query(const char *schema, const char *table_name, const char *meta_ref, const char *pkcol);
149+
char *sql_build_pk_collist_query(const char *schema, const char *table_name);
150+
char *sql_build_pk_decode_selectlist_query(const char *schema, const char *table_name);
151+
char *sql_build_pk_qualified_collist_query(const char *schema, const char *table_name);
148152

153+
char *database_table_schema(const char *table_name);
149154
char *database_build_meta_ref(const char *schema, const char *table_name);
150155
char *database_build_base_ref(const char *schema, const char *table_name);
151156

src/postgresql/cloudsync--1.0.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,3 +271,8 @@ RETURNS text
271271
AS 'MODULE_PATHNAME', 'pg_cloudsync_schema'
272272
LANGUAGE C VOLATILE;
273273

274+
-- Get current schema name (if any)
275+
CREATE OR REPLACE FUNCTION cloudsync_table_schema(table_name text)
276+
RETURNS text
277+
AS 'MODULE_PATHNAME', 'pg_cloudsync_table_schema'
278+
LANGUAGE C VOLATILE;

src/postgresql/cloudsync_postgresql.c

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,7 +1118,7 @@ Datum cloudsync_insert (PG_FUNCTION_ARGS) {
11181118
if (!table) {
11191119
char meta_name[1024];
11201120
snprintf(meta_name, sizeof(meta_name), "%s_cloudsync", table_name);
1121-
if (!database_table_exists(data, meta_name)) {
1121+
if (!database_table_exists(data, meta_name, cloudsync_schema(data))) {
11221122
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Unable to retrieve table name %s in cloudsync_insert", table_name)));
11231123
}
11241124

@@ -1212,7 +1212,7 @@ Datum cloudsync_delete (PG_FUNCTION_ARGS) {
12121212
if (!table) {
12131213
char meta_name[1024];
12141214
snprintf(meta_name, sizeof(meta_name), "%s_cloudsync", table_name);
1215-
if (!database_table_exists(data, meta_name)) {
1215+
if (!database_table_exists(data, meta_name, cloudsync_schema(data))) {
12161216
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Unable to retrieve table name %s in cloudsync_delete", table_name)));
12171217
}
12181218

@@ -1403,7 +1403,7 @@ Datum cloudsync_update_finalfn (PG_FUNCTION_ARGS) {
14031403
if (!table) {
14041404
char meta_name[1024];
14051405
snprintf(meta_name, sizeof(meta_name), "%s_cloudsync", table_name);
1406-
if (!database_table_exists(data, meta_name)) {
1406+
if (!database_table_exists(data, meta_name, cloudsync_schema(data))) {
14071407
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Unable to retrieve table name %s in cloudsync_update", table_name)));
14081408
}
14091409

@@ -1543,6 +1543,23 @@ Datum pg_cloudsync_schema (PG_FUNCTION_ARGS) {
15431543
PG_RETURN_TEXT_P(cstring_to_text(schema));
15441544
}
15451545

1546+
PG_FUNCTION_INFO_V1(pg_cloudsync_table_schema);
1547+
Datum pg_cloudsync_table_schema (PG_FUNCTION_ARGS) {
1548+
if (PG_ARGISNULL(0)) {
1549+
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("table_name cannot be NULL")));
1550+
}
1551+
1552+
const char *table_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
1553+
cloudsync_context *data = get_cloudsync_context();
1554+
const char *schema = cloudsync_table_schema(data, table_name);
1555+
1556+
if (!schema) {
1557+
PG_RETURN_NULL();
1558+
}
1559+
1560+
PG_RETURN_TEXT_P(cstring_to_text(schema));
1561+
}
1562+
15461563
// MARK: - Changes -
15471564

15481565
// Encode a single value using cloudsync pk encoding

0 commit comments

Comments
 (0)