Design of the TxLog sync protocol

Scope

This protocol describes the synchronisations of the transaction logs of two or more read-write master nodes in a high-availability cluster.

The scope specifically excludes:

  • communication between master and worker nodes;
  • synchronisation between a read-write master and a read-only master;
  • details of solutions based on external implementations of distributed synchronised structures.

Node state variables

Each Transaction Log contains synchronised-part and incoming-part (represented by Queues). The Synchronised Queue is already agreed between the masters and is an exact replica of the others, while the Incoming part is being negotiated (in what order to execute the transactions). This order has to be agreed upon, in order to guarantee the eventual consistency between separate data centers.

Synchronised queue

Description: A list of transactions that have already been queued for execution on a master node’s workers.

During normal operation, the synchronised queue is consistent across all peers in the sense that, at any given time, for any given pair of peers, either their synchronised queues are identical, or the tail of one queue is the prefix of another.

The ID of the last transaction in the synchronised queue is used as a key for matching messages in synchronisation rounds (merge base – see Normal Operation.)

Incoming queue

Description: The queue of incoming transactions, any or all of which may be either created locally, as requested by a client, or received from other nodes.

Each transaction is listed with its ID, timestamp, and originating node ID. The transactions are sorted by timestamp and transaction ID in this order. The transaction ID-s are globally unique and are assigned to the transactions upon creation at the node of origin.

Normal operation

Every participating master node repeatedly executes synchronisation rounds, each round consisting of the following steps:

  1. POST to all other known participating masters a message containing:
    • merge base (the ID of the last synchronised transaction);
    • the node’s current timestamp counter;
    • the current contents of its incoming queue (local queue).
  2. COLLECT the incoming queues (foreign queues) and timestamp counters from incoming messages POST-ed by all other known participating masters for the same merge base.
  3. ADD to the synchronised queue the longest common prefix of all collected foreign queues and the local queue, which contains only transactions with timestamps up to the least (i.e., earliest) of the collected timestamp counters.
  4. MERGE into the local queue all unknown transactions from foreign queues in their proper sorting places.

Example operation:

../_images/TransactionLog_exampleOperation.png

Initial state. The following example represents the Incoming and Sync Queues for three Masters. The start state has U1-U10 in the Sync Queues and the masters have (A, B), (C) and (D, E) as incoming updates.

../_images/initalState.png

After the COLLECT step, the incoming queues are updated with the updates from other masters. Note that some masters may not contain all the updates (in this case, the third master does not get D and E), because of the asynchronous operation of the step. After that step, the Incoming queues have the longest common sequence of A, B and C, which is used in the next ADD step.

../_images/initialState2.png

The final diagram shows the (A, B, C) updates added to the Sync Queue, while the Incoming Queue has (D, E) agreed and two new updates G and F, which came asynchronously. The system is ready to perform ADD step for (D, E) to the Sync Queue.

Exceptions

Timeout

IF
One or more nodes did not post any status within the round’s allotted time.
THEN
Complete the round between the nodes that did post status using the last known timestamp counters for the missing nodes.

It may be the case that all remaining known transactions are past the last known counters for the missing nodes. If so, there will be a sequence of “empty” rounds until the missing nodes reappear. If no progress is marked after a specified amount of time, the remaining nodes continue synchronisation without the missing ones

Laggers

IF
One or more nodes reported merge bases that are in the tails of the synchronised queues of other nodes. (All synchronised queues are still consistent with one another, i.e., each two are either identical, or the tail of one is the prefix of another.)
THEN
One or more of the forerunners update the laggers with the most up-to-date version of the synchronised queue.

Additional notes

Post/Collect timing and matching

A node should be prepared to collect status posted from other nodes before it posts its own. It is also possible that one node’s synchronisation round will include a different set of messages than those of a peer’s round. Even so, the nodes are still guaranteed to reach agreement under normal operation, as long as the timestamp counters are properly processed and maintained.

Idle/Busy mode

While there are unsynchronised transactions in the incoming queues, the nodes should execute synchronisation rounds back-to-back (busy mode) in order to synchronise the queues in the shortest possible time.

There could be periods of missing or low activity, during which the nodes would be exchanging empty incoming queues (idle rounds.) In such cases, the nodes should enter idle mode and execute synchronisation rounds less often (e.g., once a second to once a minute.)

Nodes should switch to idle mode on the first idle round, and back to busy mode on the first non-empty status posted by any peer, or on an incoming transaction.

Transaction exchange

Only the minimum information about transactions will be exchanged in the synchronisation protocol. This will prevent bulky updates (e.g., large INSERT statements) from clogging the protocol with data that is irrelevant to synchronisation. Nodes will acquire transaction data from the originating peer asynchronously through a separate interface (whose details are irrelevant to the synchronisation protocol.)

Acquired transactions only

Any node should be able to execute a transaction that has already been merged into the synchronised queue. In order to guarantee this, nodes will only post transactions that they have been able to acquire (see Transaction exchange.)

Known issues

  • Spurious failures (updates failing on one worker and succeeding on another) can cause the DC-s to go out of sync (workers within the same DC will still be in sync);
  • Dead workers will be undetected until an update is executed.

Operational requirements

  • Each synchronising master must have the following properties set:
    • Node ID (cluster config: node-id; JMX bean: NodeID) that is unique and consistent throughout the whole cluster, across DC-s;
    • master URL (cluster config: master-url; JMX bean: masterURL) set to its RDF4J protocol URL.
  • All synchronised peers must be configured in a master node’s config with the following properties:
    • ID (cluster config: peer.<n>.id) equal to the peer’s own unique Node ID;
    • URL (cluster config: peer.<n>.url) equal to the peer’s RDF4J protocol URL.

The properties can be set directly in the cluster config, or given as arguments to the JMX bean or addSyncPeer(). A master node will not synchronise with unknown (i.e., unconfigured) peers.