Over a million developers have joined DZone.

Surrogate WebSockets Alongside Rails

Scaling WebSockets in Ruby can be a tricky proposition, even with new libraries. We have a look at how to overcome this by leveraging distributed systems' rockstar, Elixir.

· Web Dev Zone

Start coding today to experience the powerful engine that drives data application’s development, brought to you in partnership with Qlik.

ActionCable is coming to Rails 5 and brings with it the promise of using WebSockets directly in Rails.

Ruby has a notoriously bad concurrency story, and that certainly extends into the realm of WebSockets and pubsub. ActionCable may be suitable for a small number of authenticated sessions, but scaling persistent connections to thousands or tens-of-thousands won’t be easy. That’s all right, the beauty of the internet is that you can mix and match technologies as you see fit!

There are plenty of other languages that your application can lean on to achieve real-time over WebSockets. In the recent past, you may have reached to Node.js for this kind of work, but there is a better option.

Pubsub is by its nature a distributed system, and it’s no secret that Erlang is a superstar in the distributed systems realm. Rather than reach for Erlang directly, we’ll leverage Elixir’s productive tooling to build out a WebSocket microservice.

By using a surrogate microservice, your Rails application remains the authority on business logic. It still handles all database interactions, sending email, and plugs away at the other work Rails excels at—it just doesn’t hold thousands of connections.

Gluing PubSub Together

You may recall from my previous article on service communication with pubsub that it is very easy to publish messages from Ruby. As a refresher, here is the simple Publisher class:

require 'redis'

class Publisher
  def publish(channel, message)
    Redis.current.publish(channel, message)

The Rails application will publish score updates whenever data changes:

# from a controller or a background worker

publisher.publish('teams-123-scores', score.to_json)

Publishing is the easy part though! So long as you have a consistent naming scheme for channels and serialize your messages, it is very straightforward. What about the Elixir side?

Getting Started With Compatibility

In the Elixir world, Phoenix is an obvious choice for hosting surrogate WebSocket connections. There are two pubsub implementations broadly available for Phoenix: Redis and PG2 (using Erlang’s distributed named process groups).

As it turns out, both modules rely on Erlang encoded terms, so neither are conducive to communication outside the Erlang ecosystem. Developing an alternate pubsub module for Phoenix is beyond the scope of this article. Instead, we’ll build a simple system that allows Ruby and Elixir to communicate over pubsub with a minimal API.

Setting Up Elixir and Redis PubSub

From here on, I’ll assume you are familiar with Elixir, the mix build tool, and testing with ExUnit. Our first step is to create a surrogate project:

mix new surrogate && cd surrogate

Now we’ll install the Redis client. There are a few different Redis clients in the Erlang world, but we’ll be using the native Elixir client Redix. It is easy to configure, easy to test, and wonderfully written. Open the generated mix.exs and modify the deps:

defp deps do
  [{:redix, "~> 0.3"}]

Now, retrieve the dependencies:

mix deps.get

That’s enough to get the project set up and explore pubsub with a small test. The only functions our module needs are subscribe/1, unsubscribe/1, and possibly broadcast/2. That makes it consistent with the Phoenix.PubSub API. Our test gets started by testing subscribe/1:

defmodule SurrogateTest do
  use ExUnit.Case

  alias Surrogate.PubSub

  test "subscribe/1 links to a topic" do
    {:ok, _pid} = PubSub.start_link

    :ok = PubSub.subscribe("topic.1")
    :ok = PubSub.subscribe("topic.2")

    {:ok, other_conn} = Redix.start_link
    {:ok, _} = Redix.command(other_conn, ~w(PUBLISH topic.1 hello))
    {:ok, _} = Redix.command(other_conn, ~w(PUBLISH topic.2 world))

    assert_receive "hello"
    assert_receive "world"

The test first establishes a link to a new surrogate server and then uses the subscribe/1 API to register the test process for published messages. Next it starts another link directly through Redix and publishes messages on both of the subscribed topics. Finally, it verifies that both of the simple hello and world messages were received by the test process.

Actually defining the Surrogate.PubSub module is a bit more involved. Retaining a list of subscribed processes is essential, as every connection from the web is a separate process. In order to retain state, we must use an Agent or a GenServer. As our module combines state and functions to manipulate that state, we will use the GenServer behaviour.

defmodule Surrogate.PubSub do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, [name: __MODULE__])

  ## Callbacks

  def init(opts) do
    {:ok, conn} = Redix.PubSub.start_link(opts)

    {:ok, %{conn: conn, subs: %{}}}

The GenServer behavior is mixed in via use, and we then define the start_link/1 convenience function. As soon as the process is started, it links itself to Redix.PubSub and retains a connection to Redis. Now we’re ready to add the subscribe/1 API method.

def subscribe(topic) do
  GenServer.call(__MODULE__, {:subscribe, topic})

## Callbacks

def handle_call({:subscribe, topic}, {pid, _}, %{conn: conn, subs: subs} = state) do
  :ok = Redix.PubSub.subscribe(conn, topic, self())

  tops = Map.get(subs, pid, MapSet.new)
  subs = Map.put(subs, pid, MapSet.put(tops, topic))

  {:reply, :ok, %{state | subs: subs}}

Subscribe issues a handle_call/3, part of the GenServer behaviour, to synchronously subscribe the caller process to a topic. Simultaneously, it adds the calling process (seen here as {pid, _}) to the server’s state.

Every process is a key in the subs map, and its value is a set of all the topics the process is subscribed to. This mapping is critical for receiving published messages and managing unsubscribes later.

Running the tests now doesn’t quite work, because the Redix.PubSub module sends out-of-band messages back to the pubsub process, and they aren’t being handled. To handle these out-of-band messages, we have to use the handle_info/2 callback:

def handle_info({:redix_pubsub, :message, message, topic}, %{subs: subs} = state) do
  for {pid, topics} <- subs do
    if MapSet.member?(topics, topic), do: send pid, message

  {:noreply, state}
def handle_info(_msg, state) do
  {:noreply, state}

There! Now the module can properly handle subscriptions and forward messages to the subscribed processes.

mix test test/surrogate_test.exs
Compiled lib/surrogate.ex

Finished in 0.1 seconds (0.06s on load, 0.04s on tests)
1 test, 0 failures

Randomized with seed 117850

What Else Does PubSub Need?

The subscribe/1 function is almost enough for one-way communication between the Rails app and the Surrogate. There isn’t any need to support broadcast/2 (which would send messages back upstream), leaving unsubscribe/1.

When the current process is unsubscribed, it should no longer receive any forwarded messages. The implementation of unsubscribe/1 is simple, effectively subscribe/1 in reverse. First, add a new test case:

test "unsubscribe/1 unlinks a topic" do
  {:ok, _pid} = PubSub.start_link

  :ok = PubSub.subscribe("topic.1")
  :ok = PubSub.unsubscribe("topic.1")

  {:ok, other_conn} = Redix.start_link
  {:ok, _} = Redix.command(other_conn, ~w(PUBLISH topic.1 hello))

  refute_receive "hello"

The test refutes that any message is broadcast after unsubscribing to "topic.1". Of course, this initially fails—now we make it pass:

def unsubscribe(topic) do
  GenServer.call(__MODULE__, {:unsubscribe, topic})

## Callbacks

def handle_call({:unsubscribe, topic}, {pid, _}, %{subs: subs} = state) do
  tops = Map.get(subs, pid, MapSet.new)
  subs = Map.put(subs, pid, MapSet.delete(tops, topic))

  {:reply, :ok, %{state | subs: subs}}

Again we rely on the power of call, which includes the calling processes’ pid in the second argument. The handler finds the entry for the calling pid and removes the specified topic from its list. When future messages are published, that topic won’t be found and the message won’t be forwarded.

This approach is simple for illustration purposes, but it is quite naive and leaves behind dangling references to processes. Production grade implementations, such as Phoenix.PubSub, run periodic GC to clean up after unsubscribes.

Stitching Into WebSockets

Our connections are built on top of Plug, which comes with an adapter for the Cowboy server. Every HTTP connection that comes to Cowboy is wrapped in a separate process. That includes WebSocket connections, which are initiated as HTTP connections and then upgraded to WebSockets.

That’s perfect for our pubsub setup! Once a connection is upgraded, the server simply waits for topic subscriptions and forwards them on to the Surrogate module.

Add both plug and cowboy to deps in mix.exs, ensure they are started as applications, and define Surrogate as the application to start.

def application do
  [mod: {Surrogate, []},
   applications: [:cowboy, :plug, :logger]]

defp deps do
  [{:redix, "~> 0.3"},
   {:plug, "~> 1.0"},
   {:cowboy, "~> 1.0"}]

In order to treat Surrogate as an application, it needs a base module that uses the Application behavior. Forgive me if what follows is a little like the how to draw an owl process. There are a lot of details in application setup that are outside the scope of this article, and it is necessary to get a real server.

defmodule Surrogate do
  use Application

  alias Plug.Adapters.Cowboy

  def start(_, _) do
    import Supervisor.Spec

    children = [
    Cowboy.child_spec(:http, Surrogate.Server, [], [dispatch: dispatch]),
    worker(Surrogate.PubSub, [])

    opts = [strategy: :one_for_one, name: Surrogate.Supervisor]

    Supervisor.start_link(children, opts)

  defp dispatch do
    [{:_, [{"/ws", Surrogate.Socket, []},
        {:_, Cowboy.Handler, {Surrogate.Server, []}}]}]

The Surrogate module is defined as a proper Erlang app, which supervises the pubsub and cowboy workers. The supervision ensures that any time a worker crashes a new one is started up in its place. Notice that the number of modules has expanded; now there are also Surrogate.Server and Surrogate.Socket modules. These will handle our incoming HTTP and WebSocket connections, respectively.

First, the Surrogate.Server module is defined as a simple plug that always returns an "OK" text response.

defmodule Surrogate.Server do
  import Plug.Conn

  def init(_opts) do

  def call(conn, _opts) do
    |> put_resp_content_type("application/text")
    |> send_resp(200, "OK")

We can verify that the server is working now by starting the application up with iex -S mix and visiting localhost:4000. If everything went according to plan, you’ll see a friendly "OK" in your browser. Now the final module needed to relay messages, Surrogate.Socket:

defmodule Surrogate.Socket do
  @behaviour :cowboy_websocket_handler

  def init(_, _req, _opts) do
    {:upgrade, :protocol, :cowboy_websocket}

  ## Callbacks

  def websocket_init(_type, req, _opts) do
    {:ok, req, %{}, 60_000}

The module uses the cowboy_websocket_handler behaviour, which requires callback handlers just like a GenServer. Upgrading to a WebSocket is handled by websocket_init/3, which is called whenever a new connection comes in to localhost:4000/ws. Once connected, we can begin sending and receiving messages through websocket_handle/3.

alias Surrogate.PubSub

def websocket_handle({:text, "ping"}, req, state) do
  {:reply, {:text, "pong"}, req, state}
def websocket_handle({:text, "subscribe|" <> topic}, req, state) do

  {:ok, req, state}
def websocket_handle({:text, "unsubscribe|" <> topic}, req, state) do

  {:ok, req, state}
def websocket_handle({:text, _}, req, state) do
  {:ok, req, state}

Each handler will pattern match on the incoming message, allowing the server to selectively handle subscribe and unsubscribe messages accordingly. At this point, the server can handle incoming messages, but it is unable to relay subscribed messages being returned from pubsub. As with all out-of-band messages to a server, those are handled with an _info/3 callback:

def websocket_info(message, req, state) do
  {:reply, {:text, message}, req, state}

Here the message is the exact value being published to the channel, and it is passed along to the WebSocket connection directly. Finally, we can test the round trip between Ruby, Elixir, and a web browser. Restart the surrogate server interactively with iex -S mix, reload the page, and open up the JavaScript console:

let ws = new WebSocket("ws://localhost:4000/ws")
ws.onmessage = (message) => console.log(message.data)

All messages published to topic. are fed to the console, whether they come from Ruby, a Redis CLI instance, or the Elixir shell. For example, when a 'score' object is serialized and published from Ruby, this is logged as data in the console:

'{"id":12345,"value":"new score"}'

Monolith This Isn’t

The system we’ve built is more complex than a monolith, but it will scale simply and perform reliably. It will easily handle thousands of topics and thousands of connections, even on the lightest weight hardware. Line for line, it may outweigh a simple naive Node.js implementation, but it is entirely fault tolerant right out of the box. That’s the benefit we get from building on top of Erlang.

This system is real-time

This is not the "beautiful monolith" approach because that simply doesn’t exist when you enter the realm of distributed systems. It relies on an external database to broker communication, which is a service in itself. When external dependencies are already at play, it is wise to leverage languages and frameworks that are perfectly suited to the task.

Create data driven applications in Qlik’s free and easy to use coding environment, brought to you in partnership with Qlik.

ruby,rails,web dev,elixir,erlang,websockets,scale

Published at DZone with permission of Parker Selbert, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}