Real-Time Command Line Applications with Action Cable and Thor

7 minute read

If you build a Rails application that has any kind of real-time feature, chances are you use Action Cable.

Action Cable allows you to build nice things such as feeds that automatically refresh as new content is published, or editors that display a list of users currently working on a document. Under the hood, it uses Websockets to stream changes to clients as they happen.

The most commonly used client is, of course, the web browser. But that doesn’t mean you can’t leverage Action Cable when using other kinds of clients - such as command line applications.

Imagine a command line client that triggers some long-running job on the server. Wouldn’t it be nice to give users live updates on how that job is advancing?

In this guide, I’ll show you how to build exactly that. We’ll create a command line app that connects to an Action Cable server, triggers a lengthy background job, and then displays live updates about its progress.

A (moving?) picture tells more than a thousand words.

demo

This is a long post. If you have no patience for words, you can find the source code of the result on GitHub.

The Server

To start things off let’s create a new Rails application. We don’t need most of Rails’ functionality in this guide, so we can skip a lot of things.

rails new actioncable-cli \
    --skip-action-mailer \
    --skip-action-mailbox \
    --skip-action-text \
    --skip-active-job \
    --skip-active-record \
    --skip-active-storage \
    --skip-javascript \
    --skip-jbuilder \
    --skip-spring \
    --skip-test \
    --skip-system-test \
    --skip-webpack-install \
    --skip-turbolinks

We will create our command line app later. First, we have to make some changes to the Action cable connection. Usually, clients provide information about the currently logged-in user, for example through session cookies, which then serves as a connection identifier. See the official connection docs.

Our command line app offers no such thing. We could add some sort of authentication mechanism, but to keep things simple we won’t. We will use a simple UUID to identify connections.

Open and modify app/channels/application_cable/connection.rb:

module ApplicationCable
  class Connection < ActionCable::Connection::Base
    identified_by :client_id

    def connect
      self.client_id = request.params[:client_id]
    end
  end
end

Next, create a worker channel, through which we’ll later publish updates. Create a new file app/channels/worker_channel.rb:

class WorkerChannel < ApplicationCable::Channel
  def subscribed
    stream_for "client_#{client_id}"
  end

  def unsubscribed
    stop_all_streams
  end
end

Because we’ll be connecting from the command line, we’ll have to disable some security measures Rails enables by default. Uncomment this line in development.rb:

config.action_cable.disable_request_forgery_protection = true

Now that we have the connection and channel set up, let’s create a background job. We’ll be using Sidekiq, so add this to your Gemfile:

gem 'sidekiq', '~> 6.1'

We must also make sure that Redis is up and running because Sidekiq relies on that for managing background workers. If you use docker-compose, add the following to docker-compose.yml:

version: "3"
services:
  actioncable-cli-redis:
    image: redis
    container_name: redis
    ports:
      - 6379:6379

Next, create a new worker in app/workers. It won’t be doing any actual work, mostly it will be taking a nap.

class Worker
  include Sidekiq::Worker

  def perform
    steps = 5
    (1..steps).each do |progress|
      sleep(rand(1..3))
      Sidekiq.logger.info("Step #{progress} for client")
    end
  end
end

To offer a way to start the background job we just created, add a new controller with the following contents:

class WorkersController < ApplicationController
  def start
    Worker.perform_async
    head(:ok)
  end
end

Don’t forget to also add a new route to your routes.rb!

get '/workers/start', to: 'workers#start'

This is a good point to stop and check how badly broken everything is :crossed_fingers:

Start your Rails app, Sidekiq, and start the worker. If all is in order, you should see your worker writing to the Sidekiq logs.

rails start
bundle exec sidekiq
# Send a request to trigger the worker
curl "http://localhost:3000/workers/start"

Success? Then on to the next part.

The Command Line App

Our command line application will offer just a single command - one that starts the worker. Thor is a simple way to create command line apps, and it’s bundled with Rails, so we’ll be using that to implement that command.

Create worker.thor in your lib/tasks directory:

require 'thor'

class Worker < Thor
  include Thor::Actions

  desc 'start', 'Start a worker process'

  def start
    puts 'Hello there!'
  end
end

You can test your command using bundle exec thor worker:start.

To receive live updates using Websockets we’ll need a Websocket client. I used async-websocket. Add it to your Gemfile:

gem 'async-websocket', '~> 0.17'

Then update your command to connect to the server. Note that we generate a UUID to identify the connection. Remember that we adapted the Action Cable connection to make use of this client_id.

require 'thor'
require 'securerandom'
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'

class Worker < Thor
  include Thor::Actions

  desc 'start', 'Start a worker process'

  def start
    @client_id = SecureRandom.uuid
    url = "ws://localhost:3000/cable?client_id=#{@client_id}"
    Async do |_|
      endpoint = Async::HTTP::Endpoint.parse(url)
      Async::WebSocket::Client.connect(endpoint) do |connection|
        while (message = connection.read)
          puts message
        end
      end
    end
  end
end

Run the command and check the server logs. You should see that a connection has been established, and should start receiving ping messages on the command line.

$ bundle exec thor worker:start
{:type=>"welcome"}
{:type=>"ping", :message=>1617639988}
...

Now we need to subscribe to the worker channel. As soon as the subscription was confirmed, we are ready to receive messages. We can then start the worker.

Adapt the Thor command as follows:

require 'thor'
require 'securerandom'
require 'net/http'
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'

class Worker < Thor
  include Thor::Actions

  desc 'start', 'Start a worker process'

  def start
    @client_id = SecureRandom.uuid
    url = "ws://localhost:3000/cable?client_id=#{@client_id}"
    Async do |_|
      endpoint = Async::HTTP::Endpoint.parse(url)
      Async::WebSocket::Client.connect(endpoint) do |connection|
        while (message = connection.read)
          on_receive(connection, message)
        end
      end
    end
  end

 private

  def on_receive(connection, message)
    handle_connection_message(connection, message)
  end

  def handle_connection_message(connection, message)
    type = message[:type]
    case type
    when 'welcome'
      on_connected(connection)
    when 'confirm_subscription'
      on_subscribed
    else
      puts message
    end
  end

  def on_connected(connection)
    content = { command: 'subscribe', identifier: { channel: 'WorkerChannel' }.to_json }
    connection.write(content)
    connection.flush
  end

  def on_subscribed
    Net::HTTP.start('localhost', 3000) do |http|
      http.get("/workers/start?client_id=#{@client_id}")
    end
  end
end

All that is missing is to stream updates from the worker to the connected clients. We’ll have to make some small changes to our worker and worker controller:

class Worker
  include Sidekiq::Worker

  def perform(client_id)
    steps = 5
    WorkerChannel.broadcast_to("client_#{client_id}", type: :worker_started, total: steps)
    (1..steps).each do |progress|
      sleep(rand(1..3))
      Sidekiq.logger.info("Step #{progress} for client #{client_id}")
      WorkerChannel.broadcast_to("client_#{client_id}", type: :worker_progress, progress: progress)
    end
    WorkerChannel.broadcast_to("client_#{client_id}", type: :worker_done)
  end
end
class WorkersController < ApplicationController
  def start
    Worker.perform_async(params[:client_id])
    head(:ok)
  end
end

Note that the worker uses the client_id to publish messages to the correct clients. We publish messages when the worker has started, when there is progress, and when the worker has finished.

We’ll update the command line app to handle these messages. Let’s also add ruby_progressbar so we can display the progress to the user.

Add this to your Gemfile.

gem 'ruby-progressbar', '~> 1.11'

Then update the Thor command once again. In the end, it should look like this:

require 'thor'
require 'securerandom'
require 'net/http'
require 'async'
require 'async/io/stream'
require 'async/http/endpoint'
require 'async/websocket/client'
require 'ruby-progressbar'

class Worker < Thor
  include Thor::Actions

  desc 'start', 'Start a worker process'

  def start
    @client_id = SecureRandom.uuid
    url = "ws://localhost:3000/cable?client_id=#{@client_id}"
    Async do |_|
      endpoint = Async::HTTP::Endpoint.parse(url)
      Async::WebSocket::Client.connect(endpoint) do |connection|
        while (message = connection.read)
          on_receive(connection, message)
        end
      end
    end
  end

  private

  def on_receive(connection, message)
    if message[:type]
      handle_connection_message(connection, message)
    else
      handle_channel_message(connection, message)
    end
  end

  def handle_connection_message(connection, message)
    type = message[:type]
    case type
    when 'welcome'
      on_connected(connection)
    when 'confirm_subscription'
      on_subscribed
    end
  end

  def handle_channel_message(connection, message)
    message = message[:message]
    type = message[:type]
    case type
    when 'worker_started'
      total = message[:total]
      @bar = ProgressBar.create(title: 'Worker Progress', total: total, format: '%t %B %c/%C %P%%')
    when 'worker_progress'
      @bar.increment
    when 'worker_done'
      connection.close
    end
  end

  def on_connected(connection)
    content = { command: 'subscribe', identifier: { channel: 'WorkerChannel' }.to_json }
    connection.write(content)
    connection.flush
  end

  def on_subscribed
    Net::HTTP.start('localhost', 3000) do |http|
      http.get("/workers/start?client_id=#{@client_id}")
    end
  end
end

The most important change here is the addition of handle_channel_message where we handle the messages we receive from the worker to create and update the progress bar.

Before wrapping up, we need to make one final change. Update cable.yml to use Redis in development. We need to do this so that our Sidekiq process knows about subscriptions made using the main process.

development:
  adapter: redis
  url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/0" } %>

The default mechanism for managing Action Cable connections in development is async, which uses in-memory structures. These are accessible only by the current process. That is no good when multiple processes need to utilize the same connections.

Restart your Rails server and, for good measure, Sidekiq process if you haven’t already and run the worker command:

bundle exec thor worker:start

Just the beginning…

This guide is done, but the story of ActionCable and command line applications isn’t. Updating a progress bar is nice and all, but it is only scratching the surface.

There is much more to explore. How about streaming process logs live to clients? Or what about streaming user inputs directly to the server?

Anything is possible - you just have to try it! :woman_scientist:

Updated: