Skip to content

Commit 44ffc42

Browse files
authored
Merge branch 'IVORYSQL_REL_1_STABLE' into IVORYSQL_REL_1_STABLE
2 parents ed8052c + f8a01f0 commit 44ffc42

8 files changed

Lines changed: 428 additions & 7 deletions

File tree

src/backend/access/heap/heapam.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
#include "pgstat.h"
5959
#include "port/atomics.h"
6060
#include "port/pg_bitutils.h"
61+
#include "replication/logicalrelation.h"
6162
#include "storage/bufmgr.h"
6263
#include "storage/freespace.h"
6364
#include "storage/lmgr.h"
@@ -2974,7 +2975,7 @@ heap_delete(Relation relation, ItemPointer tid,
29742975

29752976
if (old_key_tuple != NULL)
29762977
{
2977-
if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
2978+
if (logicalrep_identity_is_full(relation))
29782979
xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE;
29792980
else
29802981
xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
@@ -8607,7 +8608,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
86078608
xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE;
86088609
if (old_key_tuple)
86098610
{
8610-
if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
8611+
if (logicalrep_identity_is_full(reln))
86118612
xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_TUPLE;
86128613
else
86138614
xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_KEY;
@@ -8834,7 +8835,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed,
88348835
if (replident == REPLICA_IDENTITY_NOTHING)
88358836
return NULL;
88368837

8837-
if (replident == REPLICA_IDENTITY_FULL)
8838+
if (logicalrep_identity_is_full(relation))
88388839
{
88398840
/*
88408841
* When logging the entire old tuple, it very well could contain

src/backend/executor/execReplication.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "storage/bufmgr.h"
3030
#include "storage/lmgr.h"
3131
#include "utils/builtins.h"
32+
#include "replication/logicalrelation.h"
3233
#include "utils/datum.h"
3334
#include "utils/lsyscache.h"
3435
#include "utils/memutils.h"
@@ -594,8 +595,8 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
594595
return;
595596

596597
/* If relation has replica identity we are always good. */
597-
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
598-
OidIsValid(RelationGetReplicaIndex(rel)))
598+
if (OidIsValid(RelationGetReplicaIndex(rel)) ||
599+
logicalrep_identity_is_full(rel))
599600
return;
600601

601602
/*

src/backend/replication/logical/proto.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "catalog/pg_type.h"
1818
#include "libpq/pqformat.h"
1919
#include "replication/logicalproto.h"
20+
#include "replication/logicalrelation.h"
2021
#include "utils/lsyscache.h"
2122
#include "utils/syscache.h"
2223

@@ -396,6 +397,7 @@ void
396397
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
397398
{
398399
char *relname;
400+
char relreplident = rel->rd_rel->relreplident;
399401

400402
pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
401403

@@ -412,7 +414,9 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
412414
pq_sendstring(out, relname);
413415

414416
/* send replica identity */
415-
pq_sendbyte(out, rel->rd_rel->relreplident);
417+
if (logicalrep_identity_is_full(rel))
418+
relreplident = REPLICA_IDENTITY_FULL;
419+
pq_sendbyte(out, relreplident);
416420

417421
/* send the attribute info */
418422
logicalrep_write_attrs(out, rel);
@@ -666,7 +670,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
666670
pq_sendint16(out, nliveatts);
667671

668672
/* fetch bitmap of REPLICATION IDENTITY attributes */
669-
replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
673+
replidentfull = logicalrep_identity_is_full(rel);
670674
if (!replidentfull)
671675
idattrs = RelationGetIdentityKeyBitmap(rel);
672676

src/backend/replication/logical/relation.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
#include "replication/logicalrelation.h"
2626
#include "replication/worker_internal.h"
2727
#include "utils/inval.h"
28+
#include "utils/syscache.h"
2829

2930

31+
bool logical_replication_fallback_to_full_identity = false;
32+
3033
static MemoryContext LogicalRepRelMapContext = NULL;
3134

3235
static HTAB *LogicalRepRelMap = NULL;
@@ -703,3 +706,28 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
703706

704707
return entry;
705708
}
709+
710+
/*
711+
* logicalrep_identity_is_full
712+
*
713+
* Check whether the replica identity of the relation is full or not.
714+
* When a table's replica identity is default, but there is no primary key,
715+
* if logical_replication_fallback_to_full_identity is true, we consider the
716+
* replica identity as full. This function should only be called on the
717+
* publisher.
718+
*/
719+
bool
720+
logicalrep_identity_is_full(Relation relation)
721+
{
722+
Form_pg_class relform = RelationGetForm(relation);
723+
724+
if (relform->relreplident == REPLICA_IDENTITY_FULL)
725+
return true;
726+
727+
if (relform->relreplident == REPLICA_IDENTITY_DEFAULT &&
728+
logical_replication_fallback_to_full_identity &&
729+
!OidIsValid(RelationGetReplicaIndex(relation)))
730+
return true;
731+
732+
return false;
733+
}

src/backend/utils/misc/guc.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
#include "postmaster/syslogger.h"
7676
#include "postmaster/walwriter.h"
7777
#include "replication/logicallauncher.h"
78+
#include "replication/logicalrelation.h"
7879
#include "replication/reorderbuffer.h"
7980
#include "replication/slot.h"
8081
#include "replication/syncrep.h"
@@ -1223,6 +1224,15 @@ static struct config_bool ConfigureNamesBool[] =
12231224
false,
12241225
NULL, NULL, NULL
12251226
},
1227+
{
1228+
{"logical_replication_fallback_to_full_identity", PGC_SIGHUP, REPLICATION_SENDING,
1229+
gettext_noop("Use REPLICA IDENTITY FULL automatically when a table with DEFAULT identity has no primary key."),
1230+
gettext_noop("When enabled, logical replication will automatically send full-row data for tables that specify REPLICA IDENTITY DEFAULT but lack a primary key, instead of raising an error.")
1231+
},
1232+
&logical_replication_fallback_to_full_identity,
1233+
false,
1234+
NULL, NULL, NULL
1235+
},
12261236
{
12271237
{"ssl", PGC_SIGHUP, CONN_AUTH_SSL,
12281238
gettext_noop("Enables SSL connections."),

src/backend/utils/misc/postgresql.conf.sample

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,8 @@
304304
#wal_sender_timeout = 60s # in milliseconds; 0 disables
305305
#track_commit_timestamp = off # collect timestamp of transaction commit
306306
# (change requires restart)
307+
#logical_replication_fallback_to_full_identity = off # fallback default
308+
# replication identity to full if no primary key
307309

308310
# - Primary Server -
309311

src/include/replication/logicalrelation.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@
1313
#define LOGICALRELATION_H
1414

1515
#include "access/attmap.h"
16+
#include "catalog/index.h"
1617
#include "replication/logicalproto.h"
1718

19+
/* GUC variables */
20+
extern PGDLLIMPORT bool logical_replication_fallback_to_full_identity;
21+
1822
typedef struct LogicalRepRelMapEntry
1923
{
2024
LogicalRepRelation remoterel; /* key is remoterel.remoteid */
@@ -46,5 +50,6 @@ extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *r
4650
Relation partrel, AttrMap *map);
4751
extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
4852
LOCKMODE lockmode);
53+
extern bool logicalrep_identity_is_full(Relation relation);
4954

5055
#endif /* LOGICALRELATION_H */

0 commit comments

Comments
 (0)