Elixir websockets+Mongo+Redis (2)


Getting Started

In my previous post i explained what drove me into an implementation of a small OTP app to subscribe json messages in a Redis channel and foward that json to one or several clients.

I’ll walktrough some of the code that you can find here.

The elixir app is under in this folder.

I will assume you already know how to get the app dependencies and how to compile them to get a working elixir app.

The webpage folder has a webpage with bullet.js to give you a jumpstart.

To start the webpage, if you have python installed, you can run python -m SimpleHTTPServer inside the webpage folder and you will get it at http://localhost:8000.


Bootstraping

As usual in a mix project you will have a mix.exs file and the application modules implemented under the lib dir.


mix.exs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def application do
  [
    mod: { WsPubSub, [] },
    applications: [
      :crypto,
      :compiler,
      :syntax_tools,
      :cowlib,
      :ranch,
      :cowboy,
      :bullet,
      :eredis,
      :bson,
      :mongodb,
      :exredis,
      :exjson,
      :jsex,
      :jsx
    ]
  ]
end

Lines 4 to 19: Applications defined here are started with the ws_pub_sub app.

Originally i was starting these apps in the start/2 function in lib/ws_pub_sub.ex but this way feels more natural.


ws_pub_sub.ex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
defmodule WsPubSub do
  use Application.Behaviour

  def start(_type, _args) do
    # initialize cowboy
    setup_cowboy
    # Initializes the ETS to store connected users
    _table = ConnectionTable.init
    WsPubSub.Supervisor.start_link
  end

  defp setup_cowboy do
    my_dispatch = :cowboy_router.compile([
                    {:_, [{"/websocket", :buller_handler, [{:handler, WsHandler}]}]}
                  ])
    # NOTE: to listen in port 80 you probably need to run the app as sudo
    # for demo purpose i'll start in port 8088
    {:ok, _} = :cowboy.start_http(
                 :http,
                 100,
                 [{:port, 8088}],
                 [{:env, [{:dispatch, my_dispatch}]}]
               )
  end
end

In line 6 we initialize cowboy registering a websocket handler module WsHandler with bullet_handler behaviour.

In line 8 we create an ETS table to hold the data (PID + Session Key) of all authenticated clients. The ConnectionTable.ex module has a few functions defined to make the use of ets tables more elixir like and abstract the call of erlang functions. It allows us to call ConnectionTable.insert(key, value) instead of :ets.insert(@table_id, {key, value})



Websockets

The websocket handler is implemented in lib/ws_pub_sub/ws_handler.ex Authorizing via mongo could definetly be extracted to its own module but i opted to keep the mongo lookup here to allow easier browsing.


ws_handler.ex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def init(_transport, req, _opts, _active) do
  _ = :erlang.send_after(@period, self, :refresh)
  {qs, req2} = :cowboy_req.qs(req)
  # Read the querystring from request
  state = State[guid: qs]
  # check if key exists
  connect(qs)
  {:ok, req2, state}
end

defp connect(key) do
  case mongo_auth(key) do
    :ok ->
      # Key exists. User is allowed to receive updates.
      ConnectionTable.insert(key, :erlang.pid_to_list(self))
      # Publish in Redis:
      # You can notify on a given channel that a user was added
      # to the connected users table
      :global.whereis_name(:pubsub_exredis_client)
        |> Exredis.Api.publish "#{@redis_ws_in_chn}", "#{key}"
    :not_found ->
      #TODO: notify user, log attempt etc...
  end
end

Line 3: We get the querystring from cowboy_req.

Line 7: We pass the querystring to mongo to chek if the given key exists.

In lib/ws_pub_sub/ws_handler.ex you will find the mongo connection and query code under mongo_auth(key). I think it’s easy to follow so i’ll skip it … if you have any doubt open an issue on the github repo.

If the key is found we add the user key an pid to the connection table and in line 19 we publish a message in a given redis channel to notify that a new user is added to the connected users.

That takes us into the RedisPubSub module.

This module is implemented with gen_server behaviour and it is added to the app supervisor. This way we can recover from any unexpected failure, and keep on listening to messages !


redis_pub_sub.ex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def init(_options\\[]) do
  client_sub = Exredis.Sub.start(@redis_host, @redis_port)
  client = Exredis.start(@redis_host, @redis_port)
  # Register the PubSub client so it is accessible to publish connection
  # and disconnection notifications
  :global.register_name(@name, client)
  _pid = Kernel.self
  state = State[client: client, client_sub: client_sub]
  NOTE: send() is defined in another module (MsgPusher)
  # the function is "registered" as the handler for any message arriving to
  # redis @notification_channel
  client_sub |>
    Exredis.Sub.subscribe "#{@notification_channel}", fn(msg) -> MsgPusher.send(msg) end
  {:ok, state}
end

In this particular file i would like to point out lines 12 and 13.

We subscribe a redis channel and the function we pass as the one to be executed everytime a message arrives fn(msg) -> MsgPusher.send(msg) end simply invokes the send/1 function defined under MsgPusher module.

If you want to see some JSON getting into your browser open a redis console (redis-cli).

Inside redis-cli enter:

1
PUBLISH "my_channel" "{\"recipients\":[\"00721b2a-046c-4ecc-a5df-5f808cc6c58f\"],\"data\":{\"entry\":{\"id\xe2\x80\x9d:\xe2\x80\x9d123\xe2\x80\x9d,\xe2\x80\x9dcomments\":0,\"tags\":0}}}"`

As the included example webpage indicates the websocket connection will only be established(authorized) if you have an entry in MongoDB with _id: 00721b2a-046c-4ecc-a5df-5f808cc6c58f

The default database is myDB and the collection myCollection.

Elixir websockets+Mongo+Redis (1)

A few months ago a fellow developer was architecting a new app (soon to be launched) and he had a problem to solve:

Provide realtime updates in one or several devices after an event triggered by one of those devices

It had to be fault tolerant, performant… well, business as usual !

By the time i was quite interested in Erlang/Elixir, and i offered to prototype a solution. It just felt the right tool for the job.

All in all it was not that different of the canonical “hello world” of the Erlang VM languages … the chatroom example.

After a few days trying to find documentation and examples on websocket use i was able to prototype something in Erlang with cowboy.

Then things changed a bit. Prior to websocket connection “validation” a user should have a valid key stored in a database. And there would be a Redis channel to publish messages that had to be routed to a subset of the registered connected users.

It was time to refactor.

I opted to use Elixir and i’ve also added bullet 1 to my previous cowboy setup. I also had to integrate a few libraries to:

  • parse and output json
  • connect to a database to verify identity
  • subscribe to a Redis channel and listen to messages

I knew it would be a risk to chose a language which being actively developed (currently in version 0.12.4) but it just felt right !

On my next post i’ll dive into implementation details.


  1. “Bullet is a Cowboy handler and associated Javascript library for maintaining a persistent connection between a client and a server.”

Changes

They always say time changes things, but you actually have to change them yourself.
Andy Warhol


Change #1

So i’ve finally decided to change things, since time itself wouldn’t move my blog over to github.

Initially i started the blog to hold portuguese translations of a weekly Burt Beckwith’s compilation of Grails related info.

It’s time to blog more often.

Change #2

But in the last 3 years a lot has changed. I went from Grails to Rails, deployed a complex application and those steps lead me here …

Here is where ?

Well here is deciding to learn a functional programming language.

Here is deciding to keep on walking, one step at a time, towards the path of lesser complexity.

Here is Erlang and Elixir.

Change #3

I’ve also decided it was time to change from Sublime Text to something more terse. And i did. After a couple of days i’ve uninstalled Sublime and i’m now a happy vim learner.