Introducing Selium Log

Aden Herold
May 1, 2024

An introduction to fast, ordered message retention in Selium.

Message retention has finally landed for Selium, thanks to Selium Log!

Selium Log lays the foundation for complex streaming use-cases, while crossing off another huge milestone for Selium, with many more to come in the following months.

I'm excited to introduce Selium Log and give you all a brief look into our architecture without getting overly technical, so let's get started!

Justification

Previously, Selium was limited to temporary, in-memory message queuing. As messages were produced, they would be destroyed after attempted delivery. While this certainly afforded considerable amounts of throughput, the destructive, forgetful nature of the in-memory queue meant that the scope of applications using Selium was severly limited. We set out to address the following shortcomings of our current implementation.

Message Retention

The previous message broker did not persist any messages to either the filesystem or an in-memory cache, and would rather retain a buffer containing the current message to deliver. The message would then be popped from the buffer before attempting delivery to all connected subscribers, and the broker would forget all about it. This is more than fine, if not ideal for use-cases that require high throughput while allowing for the occasional missed delivery, but is a show-stopper for complex applications that need to retain a history of all produced messages.

Message Replay

Previously, subscribers were only privy to messages produced following their subscription to the topic. Once messages were popped from the queue to attempt delivery, they were destroyed. This made Selium unsuitable for any applications requiring playback of older messages, such as chat, financial and logging applications, to name a few examples.

Message Ordering

Message Ordering is a crucial feature for many critical event-based applications. For example, medical and financial applications would certainly like to be notified of events in the order that they arrive, in order to accurately monitor patient insulin levels, or prevent inaccurate financial forecasts, respectively.

Enter Selium Log

With the introduction of Selium Log, we believe that many of our previous broker implementation's shortcomings have either been addressed, or can now easily be addressed in future updates.

Message Retention

Selium Log allows retention of produced messages based on a retention policy provided by the Selium Client user, or Selium Log library author. Messages are persisted in the filesystem until they are deemed stale, before they are cleaned up by an asynchronous background task. In the following code, message retention is configured while establishing a producer stream:

Let's say we want to retain log segments for up to seven days. Note that this is the default retention period!


let publisher = connection
    .publisher("/acmeco/stocks")
    .with_encoder(StocksEncoder)
    .retain(chrono::Duration::from_days(7))?
    .open()
    .await?;

The retain method will accept any value that can be fallibly converted into a u64, so we're not limited to chrono! For example, let's express the same retention period, but with the standard library's Duration type.


let publisher = connection
    .publisher("/acmeco/stocks")
    .with_encoder(StocksEncoder)
    .retain(std::time::Duration::from_secs(60 * 60 * 24 * 7))?
    .open()
    .await?;

How about just passing the retention period directly as a u64?


const SEVEN_DAYS_IN_SECONDS: u64 = 60 * 60 * 24 * 7;

let publisher = connection
    .publisher("/acmeco/stocks")
    .with_encoder(StocksEncoder)
    .retain(SEVEN_DAYS_IN_SECONDS)?
    .open()
    .await?;

Message Replay

Selium Log solves this shortcoming by allowing subscribers in client code to optionally specify a log offset to start reading messages from. This is accomplished with the Offset type while establishing a subscriber stream. In the following example code, there are a few different examples of usage.

Let's begin consuming from the very beginning of the log. We don't want to miss a thing!


let subscriber = connection
    .subscriber("/acmeco/stocks")
    .with_decoder(StocksDecoder)
    .seek(Offset::FromStart(0))
    .open()
    .await?;

Now we'll start reading from 1,000 messages from the end of the log.


let subscriber = connection
    .subscriber("/acmeco/stocks")
    .with_decoder(StocksDecoder)
    .seek(Offset::FromEnd(1000))
    .open()
    .await?;

What if we don't care about the historical events in the log, and just want to catch new messages as they come in? It should be noted that this is the default behaviour if no seek offset is provided!


let subscriber = connection
    .subscriber("/acmeco/stocks")
    .with_decoder(StocksDecoder)
    .seek(Offset::FromEnd(0))
    .open()
    .await?;

Currently, messages can only be replayed by providing a numeric offset, but a future update will introduce replaying messages from a specific timestamp. The proposed API to start replaying messages from 7 days ago may look like the following:


let subscriber = connection
    .subscriber("/acmeco/stocks")
    .with_decoder(StocksDecoder)
    .seek(Offset::FromTimestamp(Duration::from_days(7)))
    .open()
    .await?;

Message Ordering

The previous broker implementation does feature message ordering, but it comes with an added runtime cost that is hard to ignore. With Selium Log, due to the sequential, atomic nature of the log, message ordering is inherent to the design, and is thus provided free of charge!

Delivery Guarantees

An important feature of any message queue is strong delivery guarantees. While full support for the different tiers of guarantees is still on the horizon, Selium Log lays down the foundation to allow publishers to easily apply backpressure when the most strict guarantees are required, and facilite simple retries upon failed delivery to subscribers.

Architecture

Developers possessing a background with Apache Kafka will find the Selium Log architecture to be quite familiar! In the following sections, I'll attempt to describe each component of the log.

Segment

Our logging implementation is represented by one or more segments, with each segment comprised of a memory-mapped index file, serving as a lookup for the segment's records, and an append-only data file containing said records.

Only the most current segment can be designated as the mutable, "hot" segment, while older segments are read-only, and are retained until their eventual cleanup. Likewise, once the current hot segment exceeds a defined threshold, it will also be marked as read-only, while a new segment is created and assigned as the hot segment in its place.

When a segment is created, two files will be created in the directory pertaining to that log, which will represent the index and data files respectively. The files will use the naming convention of <base-offset>.index and <base-offset>.data, respectively (I'll explain exactly what the base-offset is soon enough!). When a log is newly created, the initial hot segment will always be created, so a fresh log directory for the acmeco/stocks topic should look like the following:

Segments will eventually become full, depending on the configured LogConfig::max_index_entries setting, so new segments will be created to keep write buffers at reasonable sizes, allow efficient seeking, and reduce overhead when synchronizing the memory-mapped index with the filesystem.

For example, for a log with segments no larger than 10,000 entries, the log directory may look like the following, depending on the maturity of the log:

Stale Segments

Nothing can last forever. The same can be said for log segments. It's all well and good to spawn segment after segment as new messages are commited to the log, and continue chugging along, but eventually, storage costs will increase, and ultimately, the filesystem will be overwhelmed.

To alleviate this issue, a retention period can be specified for log segments, to prevent logs from growing too large and/or retaining stale data. When logs are created or resumed, a background task is spawned, which asynchronously scans for stale log segments and removes them from the filesystem. A segment is considered stale when the last modified timestamp in the metadata of the data file falls behind the specified retention period.

As an example, let's consider a retention policy of 7 days. As of writing this article, it is currently 3PM AEST on 01/05/2024, so the UNIX timestamp for 7 days ago is 1713934800. The following table displays each segment in a log along with their last modified time. Each segment is marked stale if its Last Modified Timestamp is less than 1713934800, which is true for offsets 0, 10000, and 20000.

Flushing

Replication has not yet been implemented for Selium Log as of this release, but is a planned feature.

Due to this shortcoming, durability can be tough to achieve alongside high throughput on a single node. Most of the latency comes from the I/O overhead of flushing the memory-mapped index and data files to the filesystem.

To compensate for this overhead, the flushing frequency can be tweaked via the FlushPolicy struct, to strike a balance between durability and throughput. Through constructing a FlushPolicy, users can determine the flushing interval, and optionally, a write-count threshold to trigger flushing.

Tweaking these settings can greatly increase throughput, but since messages aren't delivered until they're successfully flushed, it comes at the cost of possibly losing a certain amount of messages. For example, if the flushing interval is 5 minutes, it's possible to lose up to 5 minutes of messages upon an unexpected critical failure, as the write-buffer will not be flushed to the filesystem. Use FlushPolicy::every_write if durability is a critical concern.

Index

The index is a memory-mapped file that serves as a lookup directory of all messages in the segment, describing where any messages in the log can be located via byte offset. Given a relative offset, a fast lookup of a byte offset in the data file can be performed via a binary search. The index is comprised of one or more entries that are each 20 bytes in length, with the following structure:

A note on offsets

I've mentioned base offsets and relative offsets quite a bit now, so what are they, really? Every segment starts from a base offset, starting from offset 0, with each entry in the index having its own offset relative to that base offset, beginning from 1.

When an index is full, a new segment will be created using the next offset as the new base offset. A stripped down, visual representation of these offsets would look like the following:

As you can see, regardless of the base index for the segment, the relative offset for the index will always start from 1, as it is relative to that segment. This makes lookups trivial, as you can easily locate an index via a comparison with the base offset, and then subtract the difference to determine the relative offset.

Using the example above, to locate a message with an offset of 10,301, the log implementation would perform the following comparisons against the index base offsets:

  1. Is 10,301 smaller than 20,000? Yes! This is also not the index we're after.
  2. Is 10,301 smaller than 10,000? Nope! This is the index to consult!

The relative offset is then calculated using the following formula:


offset - base = relative

The above example would determine the relative offset as follows:


10,301 - 10,000 = 301

We can then ask 10000.index to fetch the byte offset for the entry with a relative offset of 301. We now learn that the message is located at byte offset 9,632 in 10000.data!

Data File

The data file is quite simple: the messages are encoded into bytes before being stored sequentially in this file. The data file is seldom accessed without first consulting the index file for an initial byte offset.

Message Format

The message frame contains information required to parse the message, a calculated CRC used to verify message integrity, and the encoded records.

Don't Need Retention?

If after reading all of the above, you've decided that log-based messaging isn't important for your use-case with Selium, fear not!

Selium Log won't be the only messaging style available in Selium, as we are working towards improving our previous high-throughput, forgetful message solution, so we can offer it as a secondary messaging style for publishers who wish to opt-out of retention.

Open Source

Because we love open-source at Selium, our selium-log crate is entirely open source, so If you have an application that could make use of a lightweight, competent log-based message queue, then please feel free! No Selium dependency required!

Simply add the selium-log dependency to cargo.toml, and you're good to go. An example of usage is as follows:


use anyhow::Result;
use selium_log::{config::LogConfig, message::Message, MessageLog};
use std::sync::Arc;

const MESSAGE_VERSION: u32 = 1;

#[tokio::main]
async fn main() -> Result<()> {
    let config = LogConfig::from_path("path/to/segments/dir");
    let log = MessageLog::open(Arc::new(config)).await?;
    let message = Message::single(b"Hello, world!", MESSAGE_VERSION);

    log.write(message).await?;
    log.flush().await?;
    let slice = log.read_slice(0, None).await?;

    if let Some(mut iter) = slice.messages() {
        let next = iter.next().await?;
        println!("{next:?}")
    }

    Ok(())
}

For extended usage, you can dig into the selium-log documentation. Contributions are always welcome, so feel free to open a PR in our repository.

Some Final Words

In closing, we're very excited to see what you build with Selium and Selium Log. We still have awesome things cooking, which we can't wait to share, so until then, thanks for tuning in, and watch this space!