Skip to content

Commit f1a3472

Browse files
committed
fix: filter per-thread archives in RAC online mode (#14)
archGetLogOnline() queried V$ARCHIVED_LOG from minSeq across all threads but created Parser objects for every row, including already-processed archives from the ahead thread. On a 2-node RAC with ~1600 seq divergence this allocated ~1600 unnecessary 1MB Parser chunks, causing OOM. Add per-thread filtering on query results, matching archGetLogPath() and archGetLogList() logic. Verified: RSS dropped from 1012MB to 115MB.
1 parent 4505579 commit f1a3472

2 files changed

Lines changed: 16 additions & 5 deletions

File tree

UPSTREAM-CHANGES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ Multi-thread redo log processing for Oracle RAC.
3737
| 1a2d316b | LOB INSERT preserved from phantom undo in streaming | Transaction.cpp | [#10](https://github.com/rophy/olr/issues/10) |
3838
| ddf6bc04 | skip truncated URP null bitmap instead of abort | OpCode0504.cpp ||
3939
| 7cbd0580 | decode ROWID column type (type# 69) | Builder.cpp, SysCol.h, BuilderJson.* | [#15](https://github.com/rophy/olr/issues/15) |
40-
|| OOM null-deref crash: throw instead of returning nullptr | Ctx.cpp, MemoryManager.cpp | [#14](https://github.com/rophy/olr/issues/14) |
40+
|| RAC archive query creates parsers for already-processed logs | ReplicatorOnline.cpp | [#14](https://github.com/rophy/olr/issues/14) |
4141

4242
**Upstream PR candidates:** All — these are correctness fixes
4343

src/replicator/ReplicatorOnline.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,7 +1601,9 @@ namespace OpenLogReplicator {
16011601
if (!replicatorOnline->checkConnection())
16021602
return;
16031603

1604-
// Find minimum sequence across all threads for the SQL filter
1604+
// Use minimum sequence across all threads for the SQL filter so the
1605+
// query finds archives for the lagging thread. Per-thread filtering
1606+
// below prevents creating Parsers for already-processed archives.
16051607
Seq minSeq = Seq::none();
16061608
{
16071609
std::unique_lock const lck(replicator->metadata->mtxCheckpoint);
@@ -1610,7 +1612,6 @@ namespace OpenLogReplicator {
16101612
minSeq = state.sequence;
16111613
}
16121614
}
1613-
// Fallback to legacy sequence if threadStates is empty
16141615
if (minSeq == Seq::none())
16151616
minSeq = replicator->metadata->sequence;
16161617

@@ -1639,6 +1640,16 @@ namespace OpenLogReplicator {
16391640

16401641
int ret = stmt.executeQuery();
16411642
while (ret != 0) {
1643+
const auto threadNum = static_cast<uint16_t>(thread);
1644+
1645+
// Filter per-thread: skip archives already processed by this thread.
1646+
// Matches archGetLogPath()/archGetLogList() logic (Replicator.cpp:531-533).
1647+
const Seq threadSeq = replicator->metadata->getSequence(threadNum);
1648+
if (threadSeq != Seq::none() && sequence < threadSeq) {
1649+
ret = stmt.next();
1650+
continue;
1651+
}
1652+
16421653
std::string mappedPath(path.data());
16431654
replicator->applyMapping(mappedPath);
16441655

@@ -1647,8 +1658,8 @@ namespace OpenLogReplicator {
16471658
parser->firstScn = firstScn;
16481659
parser->nextScn = nextScn;
16491660
parser->sequence = sequence;
1650-
parser->thread = static_cast<uint16_t>(thread);
1651-
replicator->archiveRedoQueues[static_cast<uint16_t>(thread)].push(parser);
1661+
parser->thread = threadNum;
1662+
replicator->archiveRedoQueues[threadNum].push(parser);
16521663
ret = stmt.next();
16531664
}
16541665
}

0 commit comments

Comments
 (0)