Babl provides the ability to broadcast a message to multiple topics simultaneously. This, coupled with the ability to transform messages within the session container allow for extremely scalable delivery of data flows such as market-data.
This could be achieved inefficiently by iterating over sessions in the application, but Babl provides a mechanism for publishing a single message from the application, that will be routed to multiple sessions in the Session Container.
Using Multi-Topic Broadcast
To use the multi-topic broadcast function, your application class should implement the
BroadcastSource
interface.
When Babl is launched, your application will be provided with an implementation
of the Broadcast
interface that can be used for topic management and
the sending of broadcast messages.
An example of using multi-topic broadcast can be seen below:
private static final class MarketDataApplication
implements Application, BroadcastSource
{
private Broadcast broadcast;
private int[] topicIds;
@Override
public void setBroadcast(final Broadcast broadcast)
{
// store the Broadcast implementation
this.broadcast = broadcast;
this.topicIds = new int[] {
MARKET_DATA_FULL_DEPTH, MARKET_DATA_TOP_OF_BOOK};
// create a topic for broadcast
broadcast.createTopic(MARKET_DATA_FULL_DEPTH);
broadcast.createTopic(MARKET_DATA_TOP_OF_BOOK);
}
@Override
public int onSessionConnected(final Session session)
{
// add new sessions to the topic
broadcast.addToTopic(MARKET_DATA_TOP_OF_BOOK, session.id());
return SendResult.OK;
}
public void onMarketDataUpdate(final MarketDataUpdate update)
{
final DirectBuffer buffer = serialise(update);
// send a message to all sessions registered on the topic
broadcast.sendToTopics(topicIds, buffer, 0, buffer.capacity());
}
}
In order to make this feature useful, messages can be transformed on a per-topic basic
in the session container. To do this, the application developer should configure
a
MessageTransformer
implementation:
final class MarketDataTransformer
implements MessageTransformer
{
private final ExpandableArrayBuffer dst =
new ExpandableArrayBuffer();
private final TransformResult transformResult =
new TransformResult();
@Override
public TransformResult transform(
final int topicId,
final DirectBuffer input,
final int offset,
final int length)
{
if (topicId == MARKET_DATA_TOP_OF_BOOK)
{
transformResult.set(dst, 0, encodeTopOfBook(input, offset, length));
}
else
{
transformResult.set(dst, 0, encodeFullDepth(input, offset, length));
}
return transformResult;
}
}
Message transformers are supplied to the session container config:
new BablConfig().sessionContainerConfig()
.messageTransformerFactory(topicId -> new MarketDataTransformer());