Tuesday, August 19, 2008

Using protobuf for designing and implementing replication in Drizzle

So, following the lead of Brian, I spent a few hours of the weekend to create a very simple replication scheme for Drizzle using protobuf for specifying the binary log events.

Since we are developing a replication for a cloud, there are a few things we have to consider:

  • Servers are unreliable. We shall not trust server, but we shall expect them to crash at the worst possible time (Murphy is a very good friend of mine, you know. He must be, since he visits me very often.) This means that we need to have support to allow statements to be sent to the slaves before the transaction is complete, which means that we need to support interleaving and hence both commit and rollback events.

    In order to handle interleaving, we need to have a transaction id, but in order to handle session specific data in the event of a crash (for example, temporary tables), we need to have a session id as well. However, the session id is only needed for statements, not for other events, so we add it there. This will allow the slave to expire any session objects when necessary.

    Since we cannot always know if the transaction is complete when a statement has been executed, we need to have the commit and rollback events as separate events, instead of using the alternative approach of adding flags to each query event.

  • Reconnections are frequent. Since masters go up and down all the time, we have to do what we can to make reconnections to another master easy. Among other things, it means that we cannot interrogate the master of a slave after it has crashed to figure out where we should start replication, so we need some form of Global Transaction ID to be able to decide where to start replication when connecting to another master.

    In our case, we want the transaction id to be transferable to other servers as well, so we combine the server id and the transaction id to form the Global Transaction ID for this replication.

    Since reconnections are frequent, we also need to have techniques for resolving conflicts between events, and using a timestamp is such a one. To handle that, we add a timestamp to each event, and we make room for a nano-precision timestamp immediately, meaning that we need at least 64 bits for that.

  • Network is not reliable. We expect the cloud to be spread all over the planet, so we cannot really trust the network to provide a reliable transport. This means that we need some form of checksum on each event to ensure that it was transferred correctly.
Now, for the first simple implementation, we're aiming at a statement-based replication, which means that there has to be a way to transfer session context information. Remember that statement based replication just sends statements that change the existing state of the database, but which is also dependent on the context for the session. For example, the INSERT statement below cannot be reliably replicated without the value of the user variable @foo.
SET @foo = 'Hello world!';
INSERT INTO whatever VALUES (@foo);
This is already handled in MySQL Replication by preceeding each query log events with a sequence of context events, but for this solution there is another approach that is more appropriate: adding a set of name-value pairs to the query event. Another problem is that there are functions that are non-deterministic, or context dependent in other ways, but these can be handled by rewriting the queries as follows:

Instead of...use this
INSERT INTO info VALUES (UUID(), 47); SET @tmp = UUID();
INSERT INTO info VALUES (@tmp, 47);

Now, the protobuf specification for all the above items, including some events used to control the storage of the binary log, is:

package BinaryLog;

message Header {
  required fixed64 timestamp = 1;
  required uint32 server_id = 2;
  required uint32 trans_id = 3;

message Start {
  required Header header = 1;
  required uint32 server_version = 2;
  required string server_signature = 3;

message Chain {
  required Header header = 1;
  required uint32 next = 2;            // Sequence number of next file

message Query {
  message Variable {
    required string name = 1;
    required string value = 2;

  required Header header = 1;
  repeated Variable variable = 2;
  required uint32 session_id = 3;
  required string query = 4;

message Commit {
  required Header header = 1;

message Rollback {
  required Header header = 1;
After tossing together a reader and a writer for the format, the result is:
$ ./binlog_writer --trans 1 \
> --set nick=mkindahl --set name='Mats Kindahl' \
> 'INSERT INTO whatever VALUES (@nick,@name)'
$ ./binlog_writer --trans 1 \
> --set nick=krow --set name='Brian Aker' \
> 'INSERT INTO whatever VALUES (@nick,@name)'
$ ./binlog_writer --trans 1 \
> --set nick=mtaylor --set name='Monty Taylor' \
> 'INSERT INTO whatever VALUES (@nick,@name)'
$ ./binlog_reader
# Global Id: (1,0)
# Timestamp: 484270929829911
set @name = 'Mats Kindahl'
set @nick = 'mkindahl'
INSERT INTO whatever VALUES (@nick,@name)
# Global Id: (1,0)
# Timestamp: 484330886264299
set @name = 'Brian Aker'
set @nick = 'krow'
INSERT INTO whatever VALUES (@nick,@name)
# Global Id: (1,0)
# Timestamp: 484391458447787
set @name = 'Monty Taylor'
set @nick = 'mtaylor'
INSERT INTO whatever VALUES (@nick,@name)
$ ls -l log.bin
-rw-r--r-- 1 mats bazaar 311 2008-08-19 14:22 log.bin
Protobuf Rocks! You find the branch containing the ongoing development of this at Launchpad. Right now, there is no changes to the server, we want the format to be stable first, so the branch is merged on a regular basis to the main tree as well.


burtonator said...

I don't actually think you need a global transaction ID to reconnect to another master.

I came up with a fun hack to make multi-master replication work with slave promotion.

All you have to do is make sure that the master and all the slaves begin at the exact same log positions and replicate this way moving forward.

Then you just can start replicating at the same binary position on another host.

Also, I agree regarding unreliable networks:

... see this:


If you're going to work on things from the ground up you need to add in hashcodes (but potentially with the ability to disable them).

Assume the network is going to corrupt your packets....

Mats Kindahl said...

The scheme you suggest works assuming that there are no insertions into the slave except those from a single master. As soon as we allow more than one master to a slave, or allow updates of the data on the slave, we will have problems.

The main goal of the checksum is to catch minor errors but, more importantly, allow reading the right amount of bytes from the stream. Once you know that you have read a message, it is possible to add other fields to include, e.g., a digest/hash.

Although a good idea, using a MD5 (16 bytes) or an SHA-1 (20 bytes) is a little overkill for normal usage since it both adds a significant computing step and also adds a lot to the message size.

Having it optional, as you suggest, or applying it to streaks of messages instead of individual messages is something that could be done.