While designing and developing the Lily CMS we identified the need for two components: a Write Ahead Log and an Message Queue. We implemented both on top of a RowLog library which is described and made available below. The RowLog library should be useful outside the Lily context, so that's why we're sharing it here.
First, let's talk about the use cases of a WAL and MQ:
If you have an HBase table, and each time a create, update or delete operation happens on a row of this table an index should be updated you'll need a system to guarantee that this update actually happens even in the case of failing nodes. A Write Ahead Log (WAL) can give you such a guarantee.
The purpose of the WAL is to guarantee the execution of "secondary,
subordinate actions" in case a Lily repository node would die in the midst of
their execution. These actions are synchronous actions, this means that (1) they
should complete before returning execution to the caller, and (2) they should
complete before the next update on the record is allowed to happen.
The two kinds of secondary actions we currently foresee are:
In general, we assume that the execution of secondary actions should succeed, thus that the subsystems (such as the message queue) that we call for these actions are highly-available, and if they would not be available, that it is okay for the repository to deny any further updates to the records.
To enforce the execution of secondary actions before a new update on a record can happen, we need to keep track of the still-to-be-executed secondary actions in an index which we will call the WAL. When a new update comes in, the WAL can be checked for any open secondary actions and execute them first. In case no new updates for the record arrive soon, we would still like the secondary actions to be executed within a reasonable time of their original update. Therefore some kind of a cleanup process is needed that periodically checks the WAL and executes any open secondary actions.
When secondary actions themselves fail (rather than a repository node failure) we have to make a distinction between :
Updates on Lily records should result in updates of a SOLR/Lucene index by delivering messages to an indexer component. These messages can be processed in an asynchronous way, meaning that execution can be returned to the caller. Lily's indexer also has as requirements that messasges for a specific record should be delivered in order, and no two messages for the same record should be processed concurrently.
For this we need some kind of a messaging system through which we can inform the indexer of updates that have happened on a record.
Besides the indexer, other components could be thought of for needing a message queue. For instance, an audit logging system, an email notification system, ...
An extra requirement is that we don't want any messages to get lost in order to keep our index up to date as good as possible. So, we need a persistency of the messages.
For a MQ system we could opt to use one of the many existing message queue systems. However we chose to look for a solution on top of HBase. The rationale behind this is that we want to avoid introducing external systems that need extra infrastructure, extra software, extra configuration and administration, extra maintenance and so on.
For both the WAL and MQ we want to feed messages to either secondary tasks or message consumers (indexer). These messages describe the update that happened to a record. It is important to note that these messages are always related to one HBase row in a HBase table that contains the actual data of our CMS. For this reason we introduce a component called the 'RowLog'. This is a generalised component which provides the message queue-like behaviour. The WAL and MQ in our Lily CMS are both based on this RowLog component.
The RowLog Implementation
The RowLog class is the main entry point of the RowLog component.
It accepts messages and stores them for later processing, and maintains the order in which the messages arrived on the RowLog based on a timestamp. The actual storing of the message happens in two places: a RowLogShard and on a row-local level.
The actual storing of a message happens in a RowLogShard. A shard uses a HBase table in which it stores a message with as key a timestamp to maintain the order in which the messages arrived, the row key indicating which HBase row the message is about and also a sequence number which indicates the position of the message within the set of messages that happened for a particular row. (The use of this sequence number will become clear in the next section).
When adding new messages in the order of the timestamp in which they arrive,
they will always be appended to the end of the HBase table. We also expect that,
in normal operation (especially in the context of the WAL use case), the number
of messages in the table will always remain small as they will be processed soon
after they have been put on it. For both reasons there is a risk of always
addressing the same region and at a high influx, one region could become a
bottleneck.
To avoid this bottleneck, messages can be sharded over RowLogShards, where each
shard uses a separate HBase table. The distribution of the messages over the
shards should happen randomly so that each shard is equivalent in load.
Message about one row can be spread over several shards. Within a shard the order of the messages can still be maintained based on the timestamp-order. However, one shard cannot know if its first message for a particular row has a predecessor in other shard or not. This is one reason (amongst other reasons, see later) why there is a need for a row-local index, as is explained in the next section.
The row-local index is an index of the messages which we maintain on the HBase row itself which the messages are about. Within this index, the order of the messages is maintained based on sequence number.
For each message, a payload and an execution stated is stored in column families which are separate from the row's actual data. The payload describes the actual information of the message which is needed by consumers to be able to perform their processing of the message. The actual producers and consumers of the message will have to agree on the content that is stored in this payload. Next to the payload an execution state of the message will be stored. The execution state maintains a state for each consumer of the message as will be discussed later.
There are several reasons why we need such a row-local index:
Besides the need for the row-local index, it also gives a few advantages:
A RowLogConsumer is a component that is actually interested in 'consuming' or processing a messages. Several consumers can be interested in the same message. Each consumer should register itself with the RowLog.
The row-local index contains the execution state of a message. For each
consumer it is indicated in the execution state if the message has been
processed or not. When all consumers have processed a message, it can be removed
from the row-local index.
Before a message can be processed by a consumer it should be locked by
requestion the RowLog to lock it. Since multiple instances of the consumers can
be running in the system, this lock avoids that two consumers process the same
message at the same time. The indication that a message is locked is also stored
inside the execution state of the message on the row-local index.
Messages can be processed in two ways.
A first way it to request the RowLog to process a message explicitly. The RowLog will then request each registered consumer to process this particular message.
A second way is through a RowLogProcessor. A RowLogProcessor is related to a specific RowLogShard. For this shard it will for each registered consumer pick the next message to be processed and feed it to the consumer.
The RowLog HBase schema
As mentioned, the RowLog component is used in Lily to implement both the WAL and MQ functionality.
For the WAL functionality we create one instance of the RowLog and provide it
with the information about record-table, and the column families that can be
used for the row-local index on that table.
On the RowLog a RowLogShard where the messages can be stored is registered.
(Currently we only support one shard.)
The only secondary task we currently provide is the task to put a message on the message queue (see below) for an indexer to pick this up and perform an update on the SOLR/Lucene index. For this we create consumer which is an implementation of the RowLogMesssageConsumer interface and which we call the MessageQueueFeeder. This consumer is registered with the RowLog.
When an update happens on a Lily record, a calculation is done of the changes that need to be performed on the record's data itself, and which information should be put in the payload for the indexer. A request is then sent to put the message on the RowLog. The RowLog then puts a message on the RowLogShard and also performs a put operation on the record, storing the record's data and row-local message info in one atomic operation.
At this moment, from the client's perspective, the update of the record was performed successfully.
Immediately after the update, the RowLog is requested to process the message, which will result in putting a message on the MQ.
Due to a failure, it could be that the RowLog was not (yet) requested to process the message. If a new update comes in, it will first try to process the open message. A RowLogProcessor will also be running. This processor feeds any messages, that have not been processed by an explicit call, to the consumers, in this case our MessageQueueFeeder secondary task. A difference with the MQ described below is that messages and the related secondary tasks need to be processed in-order.
For the MQ functionality we create a second instance of the RowLog. We
provide it with the same record-table information since the messages the MQ will
be about the same records as for the WAL. And different column families are
provided for the row-local index on that table which is specific for the MQ.
A RowLogShard is also registered on this RowLog.
The MessageQueueFeeder, the secondary task from the WAL, will put messages on the MQ.
The only consumer we currently have is the indexer which performs updates on the SOLR/Lucene index. This indexer is implemention of the RowLogMessageConsumer interface and is registered with the RowLog.
The MQ can work in an asynchronous way. So no explicit request is sent to the RowLog to process the message that has just been put.
A RowLogProcesser will rather pick up the messages on the shard and feed them to the indexer (consumer) for processing.
The code is a first iteration and some rough edges are to be expected but we
already want to share this with you to show the direction we are working in. New
insights might lead to partial re-designs and some functionality is not
implemented yet.
Some todo's:
License? Apache!
What else?
Also, we welcome contributions!
The latest RowLog code is now available from Lily's source tree. The downloads will no longer be maintained.
The project can be found in the rowlog subdirectory of the Lily source tree, its dependencies can be found in the pom.xml.
To get & build the code, use:
svn checkout http://dev.outerthought.org/svn_public/outerthought_lilycms lily-trunk mvn -Pfast install
Download (application/x-gzip, 1.1 MB, info)
Be sure to read the javadocs for the org.lilycms.rowlog package.
The code was developed against HBase trunk (= 0.21).
To get started, you will need the following on the classpath :
- lily-util-0.1-dev.jar
- lily-rowlog-api-0.1-dev.jar
- lily-rowlog-impl-0.1-dev.jar
- jackson-mapper-asl-1.5.0.jar
- jackson-core-asl-1.5.0.jar
- hbase-0.21.0-dev.jar
- hadoop-core-0.20.2-with-200-826.jar
- zookeeper-3.3.1.jar
- commons-logging-1.1.1.jar
- log4j-1.2.15.jar
The dependencies are included, except for hbase and hadoop-core, for which it is recommended to use the jars from the actual HBase/Hadoop version that you are using.
Below is some example code showing how to instantiate and use the RowLog.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.lilycms.rowlog.api.RowLog;
import org.lilycms.rowlog.api.RowLogException;
import org.lilycms.rowlog.api.RowLogMessage;
import org.lilycms.rowlog.api.RowLogMessageConsumer;
import org.lilycms.rowlog.api.RowLogProcessor;
import org.lilycms.rowlog.api.RowLogShard;
import org.lilycms.rowlog.impl.RowLogImpl;
import org.lilycms.rowlog.impl.RowLogProcessorImpl;
import org.lilycms.rowlog.impl.RowLogShardImpl;
public class Example {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
public static void main(String[] args) throws Exception {
TEST_UTIL.startMiniCluster(1);
Configuration configuration = TEST_UTIL.getConfiguration();
// Create the row table
final String ROW_TABLE = "rowTable";
final byte[] DATA_COLUMN_FAMILY = Bytes.toBytes("DataCF");
final byte[] PAYLOAD_COLUMN_FAMILY = Bytes.toBytes("PAYLOADCF");
final byte[] EXECUTIONSTATE_COLUMN_FAMILY = Bytes.toBytes("ESLOGCF");
HBaseAdmin admin = new HBaseAdmin(configuration);
HTableDescriptor tableDescriptor = new HTableDescriptor(ROW_TABLE);
tableDescriptor.addFamily(new HColumnDescriptor(DATA_COLUMN_FAMILY));
tableDescriptor.addFamily(new HColumnDescriptor(PAYLOAD_COLUMN_FAMILY));
tableDescriptor.addFamily(new HColumnDescriptor(EXECUTIONSTATE_COLUMN_FAMILY));
admin.createTable(tableDescriptor);
HTable rowTable = new HTable(configuration, ROW_TABLE);
// Create a RowLog instance
RowLog rowLog = new RowLogImpl(rowTable, PAYLOAD_COLUMN_FAMILY, EXECUTIONSTATE_COLUMN_FAMILY, 1000L);
// Create a shard and register it with the rowlog
RowLogShard shard = new RowLogShardImpl("AShard", configuration, rowLog);
rowLog.registerShard(shard);
// Create a consumer and register it with the RowLog
RowLogMessageConsumer consumer = new FooBarConsumer();
rowLog.registerConsumer(consumer);
//
// The WAL use case
//
// Update a row with some user data
// and put a message on the RowLog using the same put action
byte[] row1 = Bytes.toBytes("row1");
// Before updating a row, the WAL should first check if there are any open messages
// on the row that need to be processed first.
// Utility methods on the RowLog to enable this scenario are still to be implemented.
Put put = new Put(row1);
put.add(DATA_COLUMN_FAMILY, Bytes.toBytes("AUserField"), Bytes.toBytes("SomeUserData"));
RowLogMessage message;
message = rowLog.putMessage(row1, Bytes.toBytes("SomeInfo"), Bytes.toBytes("Updated:AUserField"), put);
rowTable.put(put);
// Explicitly request the RowLog to process the message
rowLog.processMessage(message);
//
// The MQ use case
//
// Create a processor and start it
RowLogProcessor processor = new RowLogProcessorImpl(rowLog, shard);
processor.start();
message = rowLog.putMessage(row1, Bytes.toBytes("SomeMoreInfo"), Bytes.toBytes("Re-evaluate:AUserField"), null);
// Give the processor some time to process the message
Thread.sleep(30000);
processor.stop();
TEST_UTIL.shutdownMiniCluster();
}
private static class FooBarConsumer implements RowLogMessageConsumer {
public static final int ID = 1;
public FooBarConsumer() {
}
public int getId() {
return ID;
}
public boolean processMessage(RowLogMessage message) {
System.out.println("= Received a message =");
System.out.println(Bytes.toString(message.getRowKey()));
System.out.println(Bytes.toString(message.getData()));
try {
System.out.println(Bytes.toString(message.getPayload()));
} catch (RowLogException e) {
// ignore
}
return true;
}
}
}
We are interested in hearing what you think of this library. You can write to the Lily mailing list.