Multi-Topic Broadcast

Babl provides the ability to broadcast a message to multiple topics simultaneously. This, coupled with the ability to transform messages within the session container allow for extremely scalable delivery of data flows such as market-data.

This could be achieved inefficiently by iterating over sessions in the application, but Babl provides a mechanism for publishing a single message from the application, that will be routed to multiple sessions in the Session Container.

Using Multi-Topic Broadcast

To use the multi-topic broadcast function, your application class should implement the BroadcastSource interface.

When Babl is launched, your application will be provided with an implementation of the Broadcast interface that can be used for topic management and the sending of broadcast messages.

An example of using multi-topic broadcast can be seen below:

private static final class MarketDataApplication
    implements Application, BroadcastSource
{
    private Broadcast broadcast;
    private int[] topicIds;

    @Override
    public void setBroadcast(final Broadcast broadcast)
    {
        // store the Broadcast implementation
        this.broadcast = broadcast;
        this.topicIds = new int[] {
            MARKET_DATA_FULL_DEPTH, MARKET_DATA_TOP_OF_BOOK};
        // create a topic for broadcast
        broadcast.createTopic(MARKET_DATA_FULL_DEPTH);
        broadcast.createTopic(MARKET_DATA_TOP_OF_BOOK);
    }

    @Override
    public int onSessionConnected(final Session session)
    {
        // add new sessions to the topic
        broadcast.addToTopic(MARKET_DATA_TOP_OF_BOOK, session.id());
        return SendResult.OK;
    }

    public void onMarketDataUpdate(final MarketDataUpdate update)
    {
        final DirectBuffer buffer = serialise(update);
        // send a message to all sessions registered on the topic
        broadcast.sendToTopics(topicIds, buffer, 0, buffer.capacity());
    }
}

In order to make this feature useful, messages can be transformed on a per-topic basic in the session container. To do this, the application developer should configure a MessageTransformer implementation:

final class MarketDataTransformer
    implements MessageTransformer
{
    private final ExpandableArrayBuffer dst =
        new ExpandableArrayBuffer();
    private final TransformResult transformResult =
        new TransformResult();

    @Override
    public TransformResult transform(
        final int topicId,
        final DirectBuffer input,
        final int offset,
        final int length)
    {
        if (topicId == MARKET_DATA_TOP_OF_BOOK)
        {
            transformResult.set(dst, 0, encodeTopOfBook(input, offset, length));
        }
        else
        {
            transformResult.set(dst, 0, encodeFullDepth(input, offset, length));
        }
        return transformResult;
    }
}

Message transformers are supplied to the session container config:

new BablConfig().sessionContainerConfig()
    .messageTransformerFactory(topicId -> new MarketDataTransformer());