我花了一些时间研究源代码,并试图自己回答这个问题。
-
postgresql只包含提交的事务,忽略中止的事务。
相关代码路径和代码段:
pg_logical_slot_get_changes_guts() -> LogicalDecodingProcessRecord() -> DecodeXactOp() ->
ReorderBufferCommit() -> ReorderBufferIterTXNNext()
DecodeXactOp():
switch (info)
{
case XLOG_XACT_COMMIT:
case XLOG_XACT_COMMIT_PREPARED:
{
xl_xact_commit *xlrec;
xl_xact_parsed_commit parsed;
TransactionId xid;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
if (!TransactionIdIsValid(parsed.twophase_xid))
xid = XLogRecGetXid(r);
else
xid = parsed.twophase_xid;
DecodeCommit(ctx, buf, &parsed, xid);
break;
}
-
内置的逻辑apply worker不重放SQL,而是使用许多内部API在接收方应用更改。
https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L585
/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
ExecOpenIndices(estate->es_result_relation_info, false);
/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);
/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot();
PG10的逻辑复制使用
pgoutput
作为输出插件,采用二进制格式,适合直接进入数据。
https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c