Simple, Distributed and Scalable PubSub in Erlang

This blog post is about how to build high scalable and distributed messaging-based applications using ErlBus, which is a lightweight and simple library to enable what we want here.

Since current release 0.2.0 (in progress), ErlBus was improved substantially. The current PubSub implementation was taken from the original, remarkable, and proven Phoenix PubSub Layer, but re-written in Erlang.

You may be wondering ¿why not to include Phoenix as dependency and call it from Erlang?. Well, there are some thoughts about it:

  1. Phoenix is a whole framework with different several modules, not only PubSub, so as a dependency, all sub-dependencies will be fetched too. Probably it isn’t a big deal, but seems like we got all Phoenix and we only use a 5% (rest will be wasted). Besides, PubSub is a simple, small, and great piece of Software (architecture and design is pretty good), so the goal was to have only that single and specific module to handle messaging, not the whole web framework.

  2. Maintainability. In this case, we wanted to have the change to maintain an lightweight Erlang PubSub version and evolve independently.

  3. Build gets more complex, you will need not only Erlang but also Elixir, and make sure that all dependencies that Phoenix brings with it and yours get compiled well. Deal with a single platform is easier than deal with two or more.

ErlBus Inside

As we explained before, ErlBus is the Erlang clone of Phoenix PubSub, so the architecture and design are exactly the same.

Due to Phoenix PubSub architecture, ErlBus scales out pretty well, not only globally (in cluster) but also locally. When we start an ErlBus instance, it starts N number of shards, and this value is set in the pool_size config parameter (by default pool_size = 1). In order to have a better idea of this, we should run ErlBus.

Let’s build and start ErlBus:

$ git clone https://github.com/cabol/erlbus.git
$ cd erlbus
$ make shell

Now we should have ebus (or ErlBus) running into an Erlang console. So now within the console let’s run the observer:

observer:start().

And, you should see a process tree like this:

ErlBus Process Tree.

In the image above, we see the main supervisor ebus_ps_pg2, which supervises ebus_ps_pg2_server and the ebus_ps_local_sup. The ebus_ps_local_sup supervises the local shards, and each shard has its own supervisor ebus_supervisor which supervises the garbage collector ebus_ps_gc and the local PubSub server ebus_ps_local. Whole this runs on each instance of ebus.

Next step may be setup an Erlang Cluster using Distributed Erlang, and then we start ebus on each one, a PG2 group is created and each local ebus_ps_pg2_server is joined in order to received all broadcasted messages and then forward them locally. This is a pretty nice and highly scalable architecture.

Now let’s see some examples!

PubSub Example

Open an Erlang shell running ebus:

$ make shell

Within the Erlang shell:

% subscribe the current shell process
ebus:sub(self(), "foo").
ok

% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>

% subscribe spawned PID
ebus:sub(Pid, "foo").
ok

% publish a message
ebus:pub("foo", {foo, "hi"}).
ok

% check received message for Pid
ebus_proc:messages(Pid).             
[{foo,"hi"}]

% check received message for self
ebus_proc:messages(self()).             
[{foo,"hi"}]

% unsubscribe self
ebus:unsub(self(), "foo").
ok

% publish other message
ebus:pub("foo", {foo, "hello"}).
ok

% check received message for Pid
ebus_proc:messages(Pid).             
[{foo,"hi"},{foo,"hello"}]

% check received message for self (last message didn't arrive)
ebus_proc:messages(self()).             
[{foo,"hi"}]

% check subscribers (only Pid should be in the returned list)
ebus:subscribers("foo").
[<0.57.0>]

% check topics
ebus:topics().
[<<"foo">>]

% subscribe self to other topic
ebus:sub(self(), "bar").
ok

% check topics
ebus:topics().
[<<"bar">>,<<"foo">>]

% publish other message
ebus:pub("bar", {bar, "hi bar"}).
ok

% check received message for Pid (last message didn't arrive)
ebus_proc:messages(Pid).             
[{foo,"hi"},{foo,"hello"}]

% check received message for self
ebus_proc:messages(self()).             
[{foo,"hi"},{bar,"hi bar"}]

Now, let’s make it more fun, start two Erlang consoles, first one:

$ erl -name node1@127.0.0.1 -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config

The second one:

$ erl -name node2@127.0.0.1 -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config

Then what we need to do is put these Erlang nodes in cluster, so from any of them send a ping to the other:

% create a callback fun to use ebus_proc utility
CB1 = fun(Msg) ->
  io:format("CB1: ~p~n", [Msg])
end
#Fun<erl_eval.6.54118792>

% other callback but receiving additional arguments,
% which may be used when message arrives
CB2 = fun(Msg, Args) ->
  io:format("CB2: Msg: ~p, Args: ~p~n", [Msg, Args])
end.
#Fun<erl_eval.12.54118792>

% use ebus_proc utility to spawn a handler
H1 = ebus_proc:spawn_handler(CB1).
<0.70.0>
H2 = ebus_proc:spawn_handler(CB2, ["any_ctx"]).
<0.72.0>

% subscribe handlers
ebus:sub(H1, "foo").
ok
ebus:sub(H2, "foo").
ok

Repeat the same thing above in node2.

Once you have handlers subscribed to the same channel in both nodes, publish some messages from any node:

% publish message
ebus:pub("foo", {foo, "again"}).
CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"
ok

And in the other node you will see those messages have arrived too:

CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"

Let’s check subscribers, so from any Erlang console:

% returns local and remote subscribers
ebus:subscribers("foo").
[<7023.67.0>,<7023.69.0>,<0.70.0>,<0.72.0>]

To learn more about it you can go HERE.

So far, so good! Let’s continue!

Point-To-Point Example

The great thing here is that you don’t need something special to implement a point-to-point behavior. It is as simple as this:

ebus:dispatch("topic1", #{payload => "M1"}).

Let’s see an example:

% subscribe local process
ebus:sub(self(), "foo").
ok

% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>

% subscribe spawned PID
ebus:sub(Pid, "foo").
ok

% check that we have two subscribers
ebus:subscribers("foo").
[<0.57.0>,<0.38.0>]

% now dispatch a message (default dispatch fun and scope)
ebus:dispatch("foo", #{payload => foo}).
ok

% check that only one subscriber received the message
ebus_proc:messages(self()).
[#{payload => foo}]
ebus_proc:messages(Pid).
[]

% dispatch with options
Fun = fun([H | _]) -> H end.
#Fun<erl_eval.6.54118792>
ebus:dispatch("foo", <<"M1">>, [{scope, global}, {dispatch_fun, Fun}]).
ok

% check again
ebus_proc:messages(self()).                                         
[#{payload => foo}]
ebus_proc:messages(Pid).                                            
[<<"M1">>]

Extremely easy isn’t?

Summing Up

ErlBus is a simple and usable library that make thousands times easier to build: soft-real-time and highly scalable messaging-based applications.

It was implemented taken as base a remarkable and proven framework like Phoenix Framework.

Next step is create a coarse-grained interface on top of ErlBus, like WebSockets, using Cowboy. So may be something similar to Phoenix Channels, isn’t?. See WEST.

Finally, to learn more check ErlBus, and ErlBus Examples.