Friday, December 18, 2009

MySQL Replicant: a library for controlling replication deployments

Keeping a MySQL installation up and running can be quite tricky at times, especially when having many servers to manage and monitor. In the replication tutorials at the annual MySQL Users' Conference, we demonstrate how to set up replication appropriately and also how to handle various issues that can arise. Many of these procedures are routine: bring down the server, edit the configuration file, bring the server up again, start a mysql client and add a user, etc.

It has always annoyed me that these procedures are perfect candidates for automation, but that we do not have the necessary interfaces to manipulate an entire installation of MySQL servers.

If there were an interface with a relatively small set of primitives—re-directing servers, bringing servers down, add a line to the configuration file, etc.—it would be possible to create pre-canned procedures that can just be executed.

To that end, I started writing on a library that would provide an interface like this. Although more familiar with Perl, Python was picked for this project, since it seems to be widely used by many database administrators (it's just a feeling I have, I have no figures to support it) and just to have a cool name on the library, we call it MySQL Replicant and it is (of course) available at Launchpad.

So what do we want to achieve with having a library like this? Well... the goal is to to provide an generic interface to complete installations and thereby make administration of large installations easy.

By providing such an interface, it will allow description of procedures in an executable format, namely as Python scripts.

In addition to making it easy to implement common tasks for experienced database administrators, it also promotes sharing by providing a way to write complete scripts for solving common problems. Having a pool of such scripts makes it easier for newcomers to get up and running.

The basic idea is that you create a model of the installation on a computer and then manipulate the model. When doing these manipulations, the appropriate commands—either as SQL commands to a running server or shell commands to the host where the server is running—will then be sent to the servers in the installation to configure them correctly.

So, to take small example, how does the code for re-directing a bunch of servers to a master look?

import mysqlrep, my_servers
for slave in my_server.slaves:
   mysqlrep.change_master(slave, my_servers.master)
In this case, the installation is defined in a separate file and is imported as a Python module. Right now, the interface for specifying a topology is quite rough, but this is going to change.
from mysqlrep import Server, User, Linux

servers = [Server(server_id=1, host="server1.example.com",
                  sql_user=User("mysql_replicant", "xyzzy"),
                  ssh_user=User("mysql_replicant"),
                  machine=Linux()),
           Server(server_id=2, host="server2.example.com",
                  sql_user=User("mysql_replicant", "xyzzy"),
                  ssh_user=User("mysql_replicant"),
                  machine=Linux()),
           Server(server_id=3, host="server3.example.com",
                  sql_user=User("mysql_replicant", "xyzzy"),
                  ssh_user=User("mysql_replicant"),
                  machine=Linux()),
           Server(server_id=4, host="server4.example.com",
                  sql_user=User("mysql_replicant", "xyzzy"),
                  ssh_user=User("mysql_replicant"),
                  machine=Linux())]
master = servers[0]
slaves = servers[1:]
Here, the Server class represents a server and to be able to do it's job, it is necessary to have one MySQL account on the server and one shell account on the host machine. Right now, it is also necessary to specify the server ID, but the plan is to just require the host, port, socket, SQL account name, and SSH account information. The remaining information can then be fetched from the configuration file of the server. Each server have a small set of primitives on top of which everything else is built:
Server.sql(SQL command)
Execute the SQL command and return a result set.
Server.ssh(command list)
Execute the command given by the command list return an iterator to the result output.
Server.start()
Start the server
Server.stop()
Stop the server.
There is a small set of commands defined on top of these primitives that can be used. Here is a list of just a few of them, but there are some more in the library at Launchpad.
change_master(slave, master, position=None)
Change the master of slave to be master and start replicating from position.
fetch_master_pos(server)
Fetch the master position of server, which is the position where the last executed statement ends in the binary log.
fetch_slave_pos(server)
Fetch the slave position of server, which is the position where the last executed event ends.
flush_and_lock_database(server)
Flush all tables on server and lock the database for read.
unlock_database(server)
Unlock a previously locked database.
Using these primitives, it is easy to clone a master by executing the code below. For this example, I use the quite naive method of backing up a database by creating an archive of the database files and copying them to the new slave.
from mysqlrep import flush_and_lock_database, fetch_master_position
from subprocess import call

flush_and_lock_database(master)
position = fetch_master_position(master)
master.ssh("tar Pzcf " + backup_name + " /usr/var/mysql")
unlock_database(master)
call(["scp", source.host + ":" + backup_name, slave.host + ":."])
slave.stop()
slave.ssh("tar Pzxf " + backup_name + " /usr/var/mysql")
slave.start()
start_replication(slave)
What do you think? Would this be a valuable project to pursue? Here are some links related to this post:

Thursday, December 17, 2009

Using mysqld_multi on Karmic

I wanted to set up several servers on my machine using the Ubuntu distribution and control them using mysqld_multi: the typical way to manage several servers on your machine. However, I also wanted to use MySQL 5.1 and not 5.0, which is the default on Jaunty (Ubuntu 9.04). About a month ago, I upgraded to Karmic Koala and one of the reasons were that MySQL 5.1 is used by default. Even though I could install the latest revision all the time, I usually want to use the real distributions for my private projects for a number of reasons.

I actually tried to upgrade to MySQL 5.1 on Ubuntu 9.04, but I discovered that all kinds of applications had dependencies on MySQL 5.0, so I avoided to upgrade at that time.

Anyway, the procedure for installing multiple servers on the same machine is this:

  1. Shut down the running server.

    This is, strictly speaking, not necessary unless you are going to edit the options for the running server, but I do this as a precaution.

  2. Edit your my.cnf configuration file and add sections for mysqld_multi and the new servers.

    I wanted to add four servers to play with, not counting the one that is already installed and running, so I added sections mysqld1 to mysqld4. Also add a section for mysqld_multi

  3. Create server directories and database files using mysql_install_db

    The new servers need to be bootstrapped so that they have all the necessary databases and tables set up.

  4. Optionally: install an init.d script that uses mysqld_multi.

    This is currently not very well-supported in Debian (there is actually a comment saying that it is not supported), so I skipped this step. If you feel adventerous, you can always copy the /usr/share/mysql/mysqld_multi.server as /etc/init.d/mysql.server as they suggest in the file, but I will not do it, nor recommend it (because I haven't tried it).

  5. Start the installed server(s).

    Well, not much to say here.

So, on my way, I edited the /etc/mysql/my.cnf and added the sections necessary. (You can see a diff of that below.)

The important options to add are server-id so that each server gets a unique server id (I'm going to replicate between them), port and socket so that you can connect to each of them both when you're on the local machine and from another machine, and pid-file to give each server a unique pid file name (this is important, since the default will not work at all).

Next step is to install the data directories for the servers, which should be trivial:

$ sudo mysql_install_db --user=mysql --datadir=/var/lib/mysqlfoo --basedir=/usr
Installing MySQL system tables...
091120  9:40:23 [Warning] Can't create test file /var/lib/mysqlfoo/romeo.lower-test
091120  9:40:23 [Warning] Can't create test file /var/lib/mysqlfoo/romeo.lower-test
ERROR: 1005  Can't create table 'db' (errno: 13)
091120  9:40:23 [ERROR] Aborting

091120  9:40:23 [Warning] Forcing shutdown of 2 plugins
091120  9:40:23 [Note] /usr/sbin/mysqld: Shutdown complete


Installation of system tables failed!  Examine the logs in
/var/lib/mysqlfoo for more information.
    .
    .
    .
OK, the warning is a warning, but it seems I forgot the permissions on the directory. Checking the write permissions, no problems. Hmmm... checking that I can create the directories and files manually as the mysql user, no problems(!)

What on earth is going on?

After some digging around, I found bug #201799 which quite clearly explains that what I thought was a permission problem is actually AppArmor doing its job.

So updating the AppArmor configuration file /etc/apparmor.d/usr.sbin.mysqld with this solved the problem and I could get on with installing the servers.

diff --git a/apparmor.d/usr.sbin.mysqld b/apparmor.d/usr.sbin.mysqld
index f9f1a37..7a94861 100644
--- a/apparmor.d/usr.sbin.mysqld
+++ b/apparmor.d/usr.sbin.mysqld
@@ -21,10 +25,20 @@
   /etc/mysql/my.cnf r,
   /usr/sbin/mysqld mr,
   /usr/share/mysql/** r,
   /var/log/mysql.log rw,
   /var/log/mysql.err rw,
+  /var/log/mysql[1-9].log rw,
+  /var/log/mysql[1-9].err rw,
   /var/lib/mysql/ r,
   /var/lib/mysql/** rwk,
+  /var/lib/mysql[1-9]/ r,
+  /var/lib/mysql[1-9]/** rwk,
   /var/log/mysql/ r,
   /var/log/mysql/* rw,
+  /var/log/mysql[1-9]/ r,
+  /var/log/mysql[1-9]/* rw,
   /var/run/mysqld/mysqld.pid w,
   /var/run/mysqld/mysqld.sock w,
+  /var/run/mysqld/mysqld[1-9].pid w,
+  /var/run/mysqld/mysqld[1-9].sock w,
 }

Changes to /etc/mysql/my.cnf

Here is a unified diff of the changes I made to /etc/mysql/my.cnf to add some more servers.
$ git diff mysql/my.cnf
--- a/mysql/my.cnf
+++ b/mysql/my.cnf
@@ -111,7 +111,46 @@ max_binlog_size         = 100M
 # ssl-cert=/etc/mysql/server-cert.pem
 # ssl-key=/etc/mysql/server-key.pem
 
+[mysqld_multi]
+mysqld         = /usr/bin/mysqld_safe
+mysqladmin     = /usr/bin/mysqladmin
+user           = root
 
+[mysqld1]
+server-id      = 1
+pid-file = /var/run/mysqld/mysqld1.pid
+socket  = /var/run/mysqld/mysqld1.sock
+port  = 3307
+datadir = /var/lib/mysql1
+log-bin        = /var/lib/mysql1/mysqld1-bin.log
+log-bin-index  = /var/lib/mysql1/mysqld1-bin.index
+
+[mysqld2]
+server-id      = 2
+pid-file = /var/run/mysqld/mysqld2.pid
+socket  = /var/run/mysqld/mysqld2.sock
+port  = 3308
+datadir = /var/lib/mysql2
+log-bin        = /var/lib/mysql2/mysqld2-bin.log
+log-bin-index  = /var/lib/mysql2/mysqld2-bin.index
+
+[mysqld3]
+server-id      = 3
+pid-file = /var/run/mysqld/mysqld3.pid
+socket  = /var/run/mysqld/mysqld3.sock
+port  = 3309
+datadir = /var/lib/mysql3
+log-bin        = /var/lib/mysql3/mysqld3-bin.log
+log-bin-index  = /var/lib/mysql3/mysqld3-bin.log
+
+[mysqld4]
+server-id      = 4
+pid-file = /var/run/mysqld/mysqld4.pid
+socket  = /var/run/mysqld/mysqld4.sock
+port  = 3310
+datadir = /var/lib/mysql4
+log-bin        = /var/lib/mysql4/mysqld3-bin.log
+log-bin-index  = /var/lib/mysql4/mysqld3-bin.log
 
 [mysqldump]
 quick

Tuesday, November 03, 2009

Bisection testing using Quilt

Having produced a nice little series of 124 patches (yes, really), I recently had to find out what patch introduced a problem for distcheck to pass. Since distcheck takes quite some time to execute, I want to make as few runs as possible.

In Git, there is the bisect command that can be used to perform bisection testing of a series of patches, but quilt does not have anything like that, so to simplify my job, I needed to implement that for quilt.

I started by defining a shell function that did the actual test, and returned the result.

do_test () {
    echo -n "running distcheck..."
    make -j6 distcheck >/dev/null 2>&1
}
After that, I added code to add values for some variables used and to process options to the script. The script supports two options: --lower and --upper. Both accept a number of a patch: the lowest patch that was good and the number of the last known patch to fail the test. I could have supplied the patch names here, but this was good enough for my purposes.

Note that I am using Bash since it has support.

series=(`quilt series`)                  # Array of the patch names
lower=0                                  # Lowest item in tested range
upper=$((${#series[@]} - 1))             # Upper limit of range

while true; do
    case "$1" in
        -l|--lower)
            lower="$1"
            shift
            ;;
        -u|--upper)
            upper="$1"
            shift
            ;;
        *)
            shift
            break
            ;;
    esac
done

middle=$(($lower + ($upper - $lower) / 2))
Then we start by preparing the looping by moving to the middle of the patches in the range.
quilt pop -a >/dev/null
quilt push $middle >/dev/null
The main loop will keep pushing or popping depending on whether the current patch fails the test or succeeds. The invariant for the loop is that that $middle holds the number of the current patch to be tested (the patch that quilt top would report) and we keep looping until $lower == $upper. Just to ensure that the right patch is tested, we test the invariant in the loop.

  • If the test succeeds, we know that the first failing test is somewhere between this patch and the last known failing test. So, we compute the next midpoint to be between this patch and the last known unsuccessful patch and store it in middle. We then push patches to reach this patch.
  • If the test fails, we know that the first failing test is somewhere between the current patch and the last known successful patch. So, we compute the next midpoint to be between this patch and the last successful patch and store it in middle. We then pop patches to reach this patch.
while test $lower -lt $upper
do
    top=`quilt top`
    echo -n "$top..."

    if test "$top" != "${series[$(($middle-1))]}"; then
        echo "invariant failed ($top != ${series[$(($middle-1))]})!" 1>&2
        exit 2
    fi

    if do_test $lower $upper; then
        lower=$(($middle + 1))
        middle=$(($lower + ($upper - $lower) / 2))
        cnt=$(($middle - $lower + 1))
        echo -n "succeeded..."
        if test $cnt -gt 0; then
            echo -n "pushing $cnt patches..."
            quilt push $cnt >/dev/null
            echo "done"
        fi
    else
        upper=$middle
        middle=$(($lower + ($upper - $lower) / 2))
        cnt=$(($upper - $middle))
        echo -n "failed..."
        if test $cnt -gt 0; then
            echo -n "popping $cnt patches..."
            quilt pop $cnt >/dev/null
            echo "done"
        fi
    fi
done
Next task: extend quilt to support the bisect command.

Wednesday, February 25, 2009

Mixing engines in transactions

As Gary already pointed out, the replication behavior when mixing non-transactional and transactional tables have changed between 5.1.30 and 5.1.31. Why did it change? Well, for starters, it actually never worked and the existing behavior was fooling people that it actually worked.

There are several bugs reported on mixing transactional and non-transactional tables in statements and in transactions. For the two latest examples, see BUG#28976 and BUG#40116. To explain the situation, I will first start with some background...

The binary log

For this discussion, we call a statement that manipulates transactional tables only a transactional statement and call it a non-transactional statement otherwise.

The binary log is a serial history of the transactions executed on the master, that is, each transaction is written to the binary log at commit time. To handle this, the binary log has a thread-specific transaction cache as well as the actual binary log. Whenever a transactional statement arrives to the binary log, it is cached in the transaction cache, and once the transaction commits, the transaction cache is written to the binary log. Non-transactional statement, on the other hand, are written directly to the binary log since they take effect immediately. Note that this is an idealized picture of how it works, and it lacks a lot of critical details, which we will cover below.

Non-transactional statements in transactions

When using a non-transactional table in a transaction, the effects are actually written directly to the table and not managed by the transaction at all. Thanks to the locking scheduler, this can guarantee a serialization order of the statements, but there is no guarantees of atomicity. Note, however, that the locks are released at the end of the statement, not the transaction. This is a result of the MyISAM legacy, which does not have a notion of transactions. In other words, this is what you get in a sample execution:
T1> create table myisam_tbl (a int) engine=myisam;
Query OK, 0 rows affected (0.04 sec)

T1> create table innodb_tbl (a int) engine=innodb;
Query OK, 0 rows affected (0.00 sec)

T1> begin;
Query OK, 0 rows affected (0.00 sec)

T1> insert into myisam_tbl values (1),(2);
Query OK, 2 rows affected (0.04 sec)
Records: 2  Duplicates: 0  Warnings: 0

T1> insert into innodb_tbl values (1),(2);
Query OK, 2 rows affected (0.00 sec)
Records: 2  Duplicates: 0  Warnings: 0

T2> select * from myisam_tbl;
+------+
| a    |
+------+
|    1 | 
|    2 | 
+------+
2 rows in set (0.00 sec)

T2> select * from innodb_tbl;
Empty set (0.00 sec)

Replicating non-transactional statements in transactions is easy

So, how should one handle this case in replication? Note that we right now only consider statement-based replication (but we will consider row-based replication later).

So, let us say that we have the transaction above, that is:

BEGIN;
INSERT INTO myisam_tbl VALUES (1),(2);
INSERT INTO innodb_tbl VALUES (1),(2);
COMMIT;
As far as possible, we want to mimic the actual behavior, meaning that the non-transactional change should be replicated to the slave as soon as possible so that even if the transaction does not commit for a long time, the MyISAM change should be visible on the slave immediately.

OK, this is simple, if a non-transactional statement is inside a transaction, we just write it to the binary log. The effects have already taken place, so of course we are write it directly to the binary log (right?).

Fine, so what about this case then:

BEGIN;
INSERT INTO innodb_tbl VALUES (1),(2);
INSERT INTO myisam_tbl SELECT * FROM innodb_tbl;
COMMIT;
Ew, um, well... no, that does not work. If the myisam_tbl insert is written before the innodb_tbl insert, it will not have the 1 and 2.

Bummer...

Hum... OK, so let's say that we only write the non-transactional statement that is first in the transaction directly to the binary log. Then we will have it replicated directly to the slave, but for the other case, when the non-transactional statement is not first in the transaction, we will have the non-transactional statement stored in the transaction cache so that it can read from the transactional table.

This is how it worked in MySQL at least since 4.1 (I didn't look earlier), and it serves well...

... unless you consider all the caveats

That was a very rosy picture, but the reality is not that easy to handle.

Rolling back a transaction.

If the transaction is rolled back, and a non-transactional statement was in the transaction cache, it is still necessary to write the transaction to the binary log (with a ROLLBACK last). This is wasting resources (disk space) since if the transaction contained only transactional statements, it could just be discarded.

Also, it will be incorrect in the case that the non-transactional engine is replaced with a transactional engine on the slave, or if there is a crash after the non-transactional statement. It only works if the statement is first in the transaction, because then it will be committed as a separate transaction ahead of the transaction that is rolled back.

For this reason...

Non-transactional statement need to be at the beginning of a transaction.

Since only the statements that are at the beginning is "written ahead", it means that the user have to remember to put the non-transactional statements first in the transaction. This lulls an inexperienced user, or one that just didn't consider replication when writing the transactions, into the idea that replication can handle the transaction even when the statement is not first in the transaction.

Incidentally, since we're talking about non-transactional statements...

What statements are non-transactional?

So, this is the scenario mentioned in BUG#40116. Let us introduce two tables, one MyISAM log table and two InnoDB tables holding the business data. We then add a trigger to log any changes to one of the InnoDB tables like this:
CREATE TABLE tbl (f INT) ENGINE=INNODB;
CREATE TABLE extra (f INT) ENGINE=INNODB;
CREATE TABLE log (r INT) ENGINE=MYISAM; 
CREATE TRIGGER tbl_tr AFTER INSERT ON tbl FOR EACH ROW
    INSERT INTO log VALUES ( NEW.f );
Now, let's have a look at this transaction (deliberately without a COMMIT or ROLLBACK):
INSERT INTO tbl VALUES (1);
INSERT INTO extra VALUES (2);
What about the first statement in the transaction? Is it non-transactional or transactional? What do we do once we have seen only that statement?

If we treat the statement as non-transactional statement and write it ahead of the transaction, we have to make a decision there and then on whether to commit it or to roll it back, which is just not possible (it will be wrong whatever we decide).

Another alternative is to look at the "top level" table and treat it as transactional (because tbl is transactional). This appears what 5.0 is doing. This means that the statement would be put in the transaction cache, but if it is later decided that the statement should roll back, we have to write the transaction to the binary log with a ROLLBACK last. This would work in an acceptable manner, but what happens if the engines are switched so that the non-transactional table is the "top level" table? Does this mean that the statement suddenly becomes non-transactional and is written ahead of the transaction? If we do that, we will have all the problems described the previous paragraph about being forced to make a good decision there and then.

Ooooookey... so we have to cache the statement even if it contains non-transactional changes. Fine. The only case that we can actually write ahead is when we have non-transactional statement not containing any transactional changes and that statement is first in a transaction... and there is a lot of logic to check that case.

So, we decided to remove that logic and always write statements inside a transaction to the transaction cache. The only remaining piece of the logic is that a transaction containing a non-transactional change is written to the binary log with a ROLLBACK last. If there are only transactional changes, the transaction cache is just tossed and nothing written to the binary log.

The only thing remaining is to print a warning when a statement containing non-transactional changes is put in the transaction cache. This is not the case right now: the server prints a warning when a transaction holding a non-transactional change is rolled back, which in my view is a tad to late, since the problem actually occurs when the statement is written to the binary log.

What about row-based replication?

Until now, we have discussed statement-based replication only, but for row-based replication we can actually do better. Any changes that are non-transactional can be written ahead of the transaction since there are no dependencies on statements inside the transaction. There are only two problems:
  1. For performance reasons, rows are cached and written to the binary log when the statement commits. This means that if there is a crash before finishing the statement, the rows written for the statement as far is it got is lost. For most transactional engines, this is not a problem, since the changes will be rolled back, but for MyISAM, the changes can stay, leading to inconsistencies between the tables on the master and the slave. Since it is not reasonable to handle this case, we assume that each statement for a non-transactional engines is atomic.
  2. There is only one transaction cache, and for the non-transactional changes to "overtake" the transactional changes, we need an additional cache that is flushed for each statement. This will be implemented as part of WL#2687.

Wednesday, August 20, 2008

The missing pieces in the protobuf binary log

Protobuf comes with a minor problem: it does not have support for handling "type tagged structures", that is, something reminiscent of objects in OOP lingo, so if one is going to have a heterogeneous sequences of messages, you have to roll it yourself. For that reason, I added a transport frame for the messages in the binary log that wraps each with some extra information. In addition to allowing the binary log to be a sequence of messages, it also adds some integrity-checking data and simplifies some administrative tasks.

Transport frame with message
Length
Type Tag
Message
Checksum
The format of each message in the sequences is given in the table in the margin. where the length is a specially encoded length that we will go through below, type is a single byte being the type tag, message being one of the messages given in the specification, and checksum being a checksum to ensure the integrity of the transport.

Checksum. As checksum, the plan is to use a CRC-32. We don't want it to be too large to affect performance, and we want it to catch reasonable losses of integrity. I'm considering storing this as a varint after the actual message, but for the time being, it is given as 4 raw bytes (it is not implemented at all yet). Please give me feedback on this: if we make it a varint, we can stuff the checksum in there, but that will also run the risk of not being able to read the checksum due to corruption of the checksum, so offhand, I would say that a fixed number of bytes is preferable.

Type Tag. The type tag is a single byte giving the type of the event. This means that we are limited to 255 events, but considering that we don't even have 26 events in 5.1 right now, I don't see that we will run into that limit very soon. It is possible to put the type tag in the message as well, and specifying it as an enum inside the protobuf specification, but that will just provide the information in two places, so it is better to keep it separately.

Length. The length is the length of the message, that is, it does not include the type tag, checksum, nor the length itself. This simplifies the normal processing, and in the event that one needs to skip an event, it is easy to compute the next position of a transport frame by just decoding the length (see below).

Length encoding

The length is encoded using a special scheme to allow for very little overhead for small events while still leaving room for giving the length for very large events. This scheme currently allows for a compact representation of lengths from 2 bytes 4 GiB (Gigabytes) and, if you don't need to have a compact representation of 2, you can represent lengths in the range 3 bytes to to 16 EiB (Exabytes) [sic].

The basic idea is to note that the length of a message can never be zero, and the minimal length in this case is actually 16 bytes. Since we will never have a length that is less than 16 for an event, that leaves the lengths 0-16 available for denoting other information. The obvious solution is to let the first byte denote either the length, if it is, say greater than 8, or the number of bytes following the byte that gives the length if it is less than or equal to 8, but we can actually do better.

Storing Length requires ceil(log256(Length)) bytes, so if we let the first byte L be

ceil(log2(ceil(log256(Length)))) - 1
(that is, the smallest power of two that is greater than the number of bytes needed for the length, minus 1), we can get away with reserving significantly fewer values. So, for example:

Bytes Value
2A 42
00 FF 02 767
01 FF 01 01 00 66047

Computing the number of bytes can be done by computing 1 << (L + 1), but computing the inverse is a little more involved. The following two functions does the job. Although it looks like a lot of code, length_encode() is actually only 29 instructions on my machine (no function calls), while length_decode() is about 7 instructions. The trick here is to compute the logarithm at the same time as we serialize the bytes into the memory, so the overhead compared to just serializing the length is just a few instructions.

inline unsigned char *
length_encode(size_t length, unsigned char *buf)
{
  unsigned char *ptr= buf;
  assert(length > 1);
  if (length < 256)
    *ptr++= length & 0xFF;
  else {
    int_fast8_t log2m1= -1;        // ceil(log2(ptr - buf)) - 1
    uint_fast8_t pow2= 1;          // pow2(log2m1 + 1)
    while (length > 0) {
      // Check the invariants
      assert(pow2 == (1 << (log2m1 + 1)));
      assert((ptr - buf) <= (1 << (log2m1 + 1)));

      // Write the least significant byte of the current
      // length. Prefix increment is used to make space for the first
      // byte that will hold log2m1.
      *++ptr= length & 0xFF;
      length >>= 8;

      // Ensure the invariant holds by correcting it if it doesn't,
      // that is, the number of bytes written is greater than the
      // nearest power of two.
      if (ptr - buf > pow2) {
        ++log2m1;
        pow2 <<= 1;
      }
    }
    // Clear the remaining bytes up to the next power of two
    while (++ptr < buf + pow2 + 1)
      *ptr= 0;
    *buf= log2m1;
    assert(ptr == buf + pow2 + 1);
  }
  return ptr;
}

inline const unsigned char *
length_decode(const unsigned char *buf, size_t *plen)
{
  if (*buf > 1) {
    *plen = *buf;
    return buf + 1;
  }

  size_t bytes= 1U << (*buf + 1);
  const unsigned char *ptr= buf + 1;
  size_t length= 0;
  for (unsigned int i = 0 ; i < bytes ; ++i)
    length |= *ptr++ << (8 * i);
  *plen= length;
  return ptr;
}

Tuesday, August 19, 2008

Using protobuf for designing and implementing replication in Drizzle

So, following the lead of Brian, I spent a few hours of the weekend to create a very simple replication scheme for Drizzle using protobuf for specifying the binary log events.

Since we are developing a replication for a cloud, there are a few things we have to consider:

  • Servers are unreliable. We shall not trust server, but we shall expect them to crash at the worst possible time (Murphy is a very good friend of mine, you know. He must be, since he visits me very often.) This means that we need to have support to allow statements to be sent to the slaves before the transaction is complete, which means that we need to support interleaving and hence both commit and rollback events.

    In order to handle interleaving, we need to have a transaction id, but in order to handle session specific data in the event of a crash (for example, temporary tables), we need to have a session id as well. However, the session id is only needed for statements, not for other events, so we add it there. This will allow the slave to expire any session objects when necessary.

    Since we cannot always know if the transaction is complete when a statement has been executed, we need to have the commit and rollback events as separate events, instead of using the alternative approach of adding flags to each query event.

  • Reconnections are frequent. Since masters go up and down all the time, we have to do what we can to make reconnections to another master easy. Among other things, it means that we cannot interrogate the master of a slave after it has crashed to figure out where we should start replication, so we need some form of Global Transaction ID to be able to decide where to start replication when connecting to another master.

    In our case, we want the transaction id to be transferable to other servers as well, so we combine the server id and the transaction id to form the Global Transaction ID for this replication.

    Since reconnections are frequent, we also need to have techniques for resolving conflicts between events, and using a timestamp is such a one. To handle that, we add a timestamp to each event, and we make room for a nano-precision timestamp immediately, meaning that we need at least 64 bits for that.

  • Network is not reliable. We expect the cloud to be spread all over the planet, so we cannot really trust the network to provide a reliable transport. This means that we need some form of checksum on each event to ensure that it was transferred correctly.
Now, for the first simple implementation, we're aiming at a statement-based replication, which means that there has to be a way to transfer session context information. Remember that statement based replication just sends statements that change the existing state of the database, but which is also dependent on the context for the session. For example, the INSERT statement below cannot be reliably replicated without the value of the user variable @foo.
SET @foo = 'Hello world!';
INSERT INTO whatever VALUES (@foo);
This is already handled in MySQL Replication by preceeding each query log events with a sequence of context events, but for this solution there is another approach that is more appropriate: adding a set of name-value pairs to the query event. Another problem is that there are functions that are non-deterministic, or context dependent in other ways, but these can be handled by rewriting the queries as follows:

Instead of...use this
INSERT INTO info VALUES (UUID(), 47); SET @tmp = UUID();
INSERT INTO info VALUES (@tmp, 47);

Now, the protobuf specification for all the above items, including some events used to control the storage of the binary log, is:

package BinaryLog;

message Header {
  required fixed64 timestamp = 1;
  required uint32 server_id = 2;
  required uint32 trans_id = 3;
}

message Start {
  required Header header = 1;
  required uint32 server_version = 2;
  required string server_signature = 3;
}

message Chain {
  required Header header = 1;
  required uint32 next = 2;            // Sequence number of next file
}

message Query {
  message Variable {
    required string name = 1;
    required string value = 2;
  }

  required Header header = 1;
  repeated Variable variable = 2;
  required uint32 session_id = 3;
  required string query = 4;
}

message Commit {
  required Header header = 1;
}

message Rollback {
  required Header header = 1;
}
After tossing together a reader and a writer for the format, the result is:
$ ./binlog_writer --trans 1 \
> --set nick=mkindahl --set name='Mats Kindahl' \
> 'INSERT INTO whatever VALUES (@nick,@name)'
$ ./binlog_writer --trans 1 \
> --set nick=krow --set name='Brian Aker' \
> 'INSERT INTO whatever VALUES (@nick,@name)'
$ ./binlog_writer --trans 1 \
> --set nick=mtaylor --set name='Monty Taylor' \
> 'INSERT INTO whatever VALUES (@nick,@name)'
$ ./binlog_reader
# Global Id: (1,0)
# Timestamp: 484270929829911
set @name = 'Mats Kindahl'
set @nick = 'mkindahl'
INSERT INTO whatever VALUES (@nick,@name)
# Global Id: (1,0)
# Timestamp: 484330886264299
set @name = 'Brian Aker'
set @nick = 'krow'
INSERT INTO whatever VALUES (@nick,@name)
# Global Id: (1,0)
# Timestamp: 484391458447787
set @name = 'Monty Taylor'
set @nick = 'mtaylor'
INSERT INTO whatever VALUES (@nick,@name)
$ ls -l log.bin
-rw-r--r-- 1 mats bazaar 311 2008-08-19 14:22 log.bin
Protobuf Rocks! You find the branch containing the ongoing development of this at Launchpad. Right now, there is no changes to the server, we want the format to be stable first, so the branch is merged on a regular basis to the main tree as well.

Thursday, August 14, 2008

A join I/O manipulator for IOStream

I started playing around with protobuf when doing some stuff in Drizzle (more about that later), and since the examples where using IOStream, the table reader and writer that Brian wrote is using IOStreams. Now, IOStreams is pretty powerful, but it can be a pain to use, so of course I start tossing together some utilities to make it easier to work with.

Being a serious Perl addict since 20 years, I of course start missing a lot of nice functions for manipulating strings, and the most immediate one is join, so I wrote a C++ IOStream manipulator to join the elements of an arbitrary sequence and output them to an std::ostream.

In this case, since the I/O Manipulator takes arguments, it has to be written as a class. Recall that std::cout << foo(3) is just a shorthand for operator<<(std::cout, foo(3)). Since we want to avoid constructing the full string before writing it to the output stream, we define our own joiner class and create a operator<<(std::ostream&, joiner&) function that work with the joiner class in the following manner:

template <class FwdIter> class joiner {
  friend std::ostream& operator<<(std::ostream& out, const joiner& j) {
    j.write(out);
    return out;
  }

public:
  explicit joiner(const std::string& separator, FwdIter start, FwdIter finish)
    : m_sep(separator), m_start(start), m_finish(finish)
  { }

private:
  std::string m_sep;
  FwdIter m_start, m_finish;

  void write(std::ostream& out) const {
    FwdIter fi = m_start;
    if (m_start == m_finish)
      return;
    while (true) {
      out << *fi;
      if (++fi == m_finish)
        break;
      else
        out << m_sep;
    }
  }
};
So, now we can write something like:
std::cout << joiner<std::vector<int>::const_iterator>(",", seq.begin(), seq.end())
          << std::endl;
This is an awful lot to type, and especially difficult to maintain since the type of the sequence we are printing might change, so we introduce two helper functions to infer the iterator type for us:
template <class FwdIter>
joiner<FwdIter>
join(const std::string& delim, FwdIter start, FwdIter finish) {
  return joiner<FwdIter>(delim, start, finish);
}

template <class Container>
joiner<typename Container::const_iterator>
join(const std::string& delim, Container seq) {
  typedef typename Container::const_iterator FwdIter;
  return joiner<FwdIter>(delim, seq.begin(), seq.end());
}
Now we can use the following code to write out a comma-separated sequence and let the compiler infer the types for us.
std::cout << join(",", seq.begin(), seq.end())
          << std::endl;
or even more compactly
std::cout << join(",", seq) << std::endl;
Update: There were a bug that cause the write() function above to try to read the first element of an empty sequence. I have added some code in red above that needs to be added to handle empty sequences.