AlmaConnect

Hi, We'are AlmaConnect.

Alma Connect provides alumni platform for your institute. You can request and check for your college network: visit www.almaconnect.com now.

Alma Connect

Implementing PUB/SUB in Rails; using ActiveSupport::Notifications

This article is about implementing a simple publisher/subscriber model in Rails using ActiveSupport::Notifications (ASN). Exploring problem areas where PUBSUB can improve modularity and reduce coupling. We would also upgrade our basic implementation to cater to common use cases.

A little about PUB/SUB?

According to wikipedia is:

In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers. Instead, published messages are characterized into classes, without knowledge of what, if any, subscribers there may be. Similarly, subscribers express interest in one or more classes, and only receive messages that are of interest, without knowledge of what, if any, publishers there are.

Rails is a good framework for proof-of-concept and small applications, it promotes conventions over configuration. As the application grows it becomes difficult to maintain. This is primarily because rails promotes coupeled architecture with fat model, skinny controller approach. Developers need to adopt to new patterns and create new conventions to keep projects moving on pace and in shape. PUBSUB can help us resolve some of the problems we face often.

Implementing basic publisher & subscriber; lets get it on

Basic requirements of our PUBSUB are:

  1. Publishers should be able to publish named events with payload
  2. Subscribers should be able to subscribe to events matching a name and should receive the payload object with every event

Such a system is already bundled with rails: its ASN. ASN is used through out rails internally for instrumentation. The basic implementation of ASN is such a simple yet powerful API that we can start our PUBSUB implementation on top of it.

Publisher

 1 # app/pub_sub/publisher.rb
 2 module Publisher
 3   extend self
 4  
 5   # delegate to ActiveSupport::Notifications.instrument
 6   def broadcast_event(event_name, payload={})
 7     if block_given?
 8       ActiveSupport::Notifications.instrument(event_name, payload) do
 9         yield
10       end
11     else
12       ActiveSupport::Notifications.instrument(event_name, payload)
13     end
14   end
15 end

This is as simple as it gets. Publisher can broadcast an event with a payload. An optional block can be passed. If a block is passed, event published will also contain information about execution time and exception if any. All additional goodness associated with blocks is provided by ASN used as underlying implementation, interesting!!).

 1 if user.save
 2   # publish event 'user.created', with payload {user: user}
 3   Publisher.broadcast_event('user.created', user: user)
 4 end
 5 
 6 def create_user(params)
 7   user = User.new(params)
 8   
 9   # publish event 'user.created', with payload {user: user}, using block syntax 
10   # now the event will have additional data about duration and exceptions
11   Publisher.broadcast_event('user.created', user: user) do 
12     User.save!
13     # do some more important stuff here
14   end
15 end

Subscriber

 1 # app/pub_sub/subscriber.rb
 2 module Subscriber
 3   # delegate to ActiveSupport::Notifications.subscribe
 4   def self.subscribe(event_name)
 5     if block_given?
 6       ActiveSupport::Notifications.subscribe(event_name) do |*args|
 7         event = ActiveSupport::Notifications::Event.new(*args)
 8         yield(event)
 9       end
10     end
11   end
12 end

Subscriber can be used to subscribe to a named event and passed block will receive an instance of ActiveSupport::Notifications::Event as parameter.

1 # subscriber example usage
2 Subscriber.subscribe('user.created') do |event|
3   error = "Error: #{event.payload[:exception].first}" if event.payload[:exception]
4   puts "#{event.transaction_id} | #{event.name} | #{event.time} | #{event.duration} | #{event.payload[:user].id} | #{error}"
5 end

Use cases where PUB/SUB can be helpful

These are some problems we face in our routine work, which can be solved by PUBSUB:

Mail deliveries coupled with code

Problem: There are always some kind of notifications at work in any user centric application. Generally it starts with after_save model callback and slowly moves out of there to a services layer. Lets explore, how can we handle welcome emails using PUBSUB.

Solution: Since we are adding this pattern to our application, lets bake in the modularity in core usage, for the time being a simple string namespace should be good enough.

 1 # app/pub_sub/publisher.rb
 2 module Publisher
 3   extend ::ActiveSupport::Concern
 4   extend self
 5 
 6   included do
 7     # add support for namespace, one class - one namespace
 8     class_attribute :pub_sub_namespace
 9 
10     self.pub_sub_namespace = nil
11   end
12 
13   # delegate to class method
14   def broadcast_event(event_name, payload={})
15     if block_given?
16       self.class.broadcast_event(event_name, payload) do
17         yield
18       end
19     else
20       self.class.broadcast_event(event_name, payload)
21     end
22   end
23 
24 
25   module ClassMethods
26     # delegate to ASN
27     def broadcast_event(event_name, payload={})
28       event_name = [pub_sub_namespace, event_name].compact.join('.')
29       if block_given?
30         ActiveSupport::Notifications.instrument(event_name, payload) do
31           yield
32         end
33       else
34         ActiveSupport::Notifications.instrument(event_name, payload)
35       end
36     end
37   end
38 
39 end

The publisher under went some changes. It can be included in a class and after setting a namespace, you are all set to publish events in that namespace. Lets start broadcasting event user_signed_up in registration namespace.

 1 # app/pub_sub/publishers/registration.rb
 2 module Publishers
 3   class Registration
 4     include Publisher
 5 
 6     self.pub_sub_namespace = 'registration'
 7   end
 8 end
 9 
10 # broadcast event
11 if user.save
12   Publishers::Registration.broadcast_event('user_signed_up', user: user)
13 end

Publisher seems to be good, lets add some ruby goodness to subscribers too.

 1 # app/pub_sub/subscribers/base.rb
 2 module Subscribers
 3   class Base
 4     class_attribute :subscriptions_enabled
 5     attr_reader :namespace
 6 
 7     def initialize(namespace)
 8       @namespace = namespace
 9     end
10 
11     # attach public methods of subscriber with events in the namespace
12     def self.attach_to(namespace)
13       log_subscriber = new(namespace)
14       log_subscriber.public_methods(false).each do |event|
15         ActiveSupport::Notifications.subscribe("#{namespace}.#{event}", log_subscriber)
16       end
17     end
18 
19     # trigger methods when an even is captured
20     def call(message, *args)
21       method  = message.gsub("#{namespace}.", '')
22       handler = self.class.new(namespace)
23       handler.send(method, ActiveSupport::Notifications::Event.new(message, *args))
24     end
25   end
26 end

We have created a base class which subscribers can extend. Base class contains the magic to map events in a namespace to methods in the subscriber. Lets subscribe to the event we have been broadcasting and send the welcome email.

 1 # app/pub_sub/subscribers/registration_mailer.rb
 2 module Subscribers
 3   class RegistrationMailer < ::Subscribers::Base
 4     def user_signed_up(event)
 5       # lets delay the delivery using delayed_job
 6       RegistrationMailer.delay(priority: 1).welcome_email(event.payload[:user])
 7     end
 8   end
 9 end
10 
11 # config/initializers/subscribers.rb
12 Subscribers::RegistrationMailer.attach_to('registration')

This looks good, signup process just broadcasts an event about user signup and carries on with its tasks. Modules which are not core to the application can then hook onto these events to attach extended functionality.

Building modular, loosely coupled system with message publishing has its own cons. Biggest being, the system is too loosely coupled. If something stops working no errors start cropping up and it might take too long to identify it. However I believe that failures can happen anywhere and a system which fails with grace is more robust that one which fails completely. Its another layer which gets introduced while building scalable systems. Base Line: for me pros out weigh the cons

Conclusion: With little modifications, our PUBSUB now supports namespaces (read features, modules, sub-systems, opportunities!!). Adding a layer on top of already useful ActiveSupport::Notification already feels good enough.

Messy callback chain introduced due to denormalization

Problem: We use mongodb via mongoid and denormalize data to cut down on queries. Initially it was some callbacks and method overrides, which grew out to be modules and then a simple re-usable denormalization plugin. Plugin fall backs to querying data from related model and also cache it for future use. This works like a cache fetch, calculate, store strategy with remote document as origin and current document as cache store. The code looks something like:

 1 # app/models/user.rb
 2 class User
 3   include AlmaConnect::Denormalization
 4   field :name, type: String
 5   has_many :posts
 6 
 7   # this sets up a callback for when name changes,
 8   # it propagates changes to posts where user_id matches the user
 9   denormalize_to :posts, fields: [:name]
10 end
11 
12 # app/models/post.rb
13 class Post
14   include AlmaConnect::Denormalization
15   belongs_to :user
16 
17   # this sets up a callback to update user_name when user_id changes
18   # also fetch and set, user name from user if it is not already set, when accessed
19   denormalize_from :user, fields: [:name]
20 end

It has been working out very nicely, but it results in very coupled code. We define denormalization macros in the models. This makes all our models aware of what data they are denormalizing, where from and also what data they need to denormalize, where to. What if we need to denormalize user name to comment as well?

 1 # app/models/comment.rb
 2 class Comment
 3   include AlmaConnect::Denormalization
 4   embedded_in :post
 5   belongs_to :user, inverse_of: nil
 6   denormalize_from :user, fields: [:name]
 7 end
 8 
 9 # app/models/post.rb
10 class Post
11   include AlmaConnect::Denormalization
12   belongs_to :user
13   embeds_many :comments
14   denormalize_from :user, fields: [:name]
15 end
16 
17 # app/models/user.rb
18 class User
19   include AlmaConnect::Denormalization
20   field :name, type: String
21   has_many :posts
22   denormalize_to :posts, fields: [:name]
23 
24   # we don't define the relation here, still define the denormalization macro
25   denormalize_to 'post.comments', fields: [:name]
26 
27   # may be many more denormalize_to calls here if comment has a polymorphic parent
28 end

You see the problem, right? User need not have knowledge about comment, but needed it there to sync changes whenever user name changes.

Solution: Instead of sprinkling denormalization macros all around, we can leave the from macros as is, the fallback logic is very handy. We can replace to macros, with publishing change events. We would simply publish a message whenever a user name changes. We would capture the change in a before_save callback and publish it in after_save. By publishing in after_save we ensure that user is persisted at time of publishing. We can change the publisher to hook directly into model callback chain. Lets call the change events: notifications, because they are not wrappers around code blocks. They are simply notifications that "something changed". Events can be described as: "something happened and it took this much time to do it, oh sorry! it was just attempted, we got this error and it was not completed."

Lets create a publisher module, which can be included in any active model complaint model, supporting callbacks. Model can attach a namespace to itself and assign a publisher to the namespace. Attached publisher should immediately start broadcasting important notifications like created and destroyed. To broadcast additional notifications, publisher should be able to hook in.

  1 # app/pub_sub/publishers/base.rb
  2 # hook into model callback chain when a publisher is attached to a model 
  3 # capture changes before_save and publish in after_save
  4 # start publishing created and destroyed notification
  5 module Publishers
  6   module Base
  7     extend ActiveSupport::Concern
  8 
  9     # inject a reader to handle notifications for model in namespaces
 10     def pub_sub_notifications
 11       @pub_sub_notifications ||= ::Publishers::PubSubNotifications.new(self)
 12     end
 13 
 14     module ClassMethods
 15       def attach_publisher(namespace, publisher_class)
 16         # attach publisher to model class
 17         after_initialize do |model|
 18           model.pub_sub_notifications.attach_publisher(namespace, publisher_class)
 19         end
 20 
 21         # emit created notification and let he publisher hook in to the notifications
 22         before_save do |model|
 23           model.pub_sub_notifications.prepare_created(namespace)
 24           model.pub_sub_notifications.prepare_notifications(namespace)
 25         end
 26 
 27         # publish notifications after save
 28         after_save do |model|
 29           model.pub_sub_notifications.publish_notifications(namespace)
 30         end
 31 
 32         # emit destroy notification and publish notifications
 33         after_destroy do |model|
 34           model.pub_sub_notifications.prepare_destroyed(namespace)
 35           model.pub_sub_notifications.publish_notifications(namespace)
 36         end
 37       end
 38     end
 39 
 40   end
 41 end
 42 
 43 # app/pub_sub/publishers/notifications_queue.rb
 44 # capture attachment of a publisher to a model in a namespace
 45 module Publishers
 46   class NotificationsQueue
 47     attr_reader :publisher, :notifications
 48 
 49     def initialize(publisher_name)
 50       @publisher = publisher_name.to_s.constantize.new
 51       @notifications = []
 52     end
 53 
 54     def add_notification(event_name, payload={})
 55       @notifications << {event_name: event_name, payload: payload}
 56     end
 57 
 58     def reset_notifications
 59       @notifications = []
 60     end
 61   end
 62 end
 63 
 64 # app/pub_sub/publishers/pub_sub_notifications.rb
 65 # handle different attachments of publishers to a model
 66 module Publishers
 67   class PubSubNotifications
 68     include ::Publisher
 69 
 70     attr_reader :publishers_info, :model
 71 
 72     def initialize(model)
 73       @publishers_info = {}
 74       @model               = model
 75     end
 76 
 77     def attach_publisher(namespace, publisher_name)
 78       publishers_info[namespace] ||= Publishers::NotificationsQueue.new(publisher_name)
 79       true
 80     end
 81 
 82     def reset_notifications(namespace)
 83       publishers_info[namespace].reset_notifications
 84     end
 85 
 86     def add_notification(namespace, event_name, payload={})
 87       publishers_info[namespace].add_notification(event_name, payload)
 88     end
 89 
 90     def prepare_created(namespace)
 91       add_notification(namespace, 'created') if model.new_record?
 92     end
 93 
 94     def prepare_destroyed(namespace)
 95       add_notification(namespace, 'destroyed') if model.destroyed?
 96     end
 97 
 98     def prepare_notifications(namespace)
 99       publishers_info[namespace].publisher.prepare_notifications(namespace, model)
100 
101       return true
102     end
103 
104     def publish_notifications(namespace)
105       publishers_info[namespace].notifications.each do |notification|
106         broadcast_event(
107             [namespace, notification[:event_name]].compact.join('.'),
108             notification[:payload].merge(model: model)
109         )
110       end
111 
112       return true
113     end
114 
115   end
116 end

The code seems a little difficult to follow, bet lets postpone the re-factoring for later.

Lets try to implement the denormalize_to for user name. Basic flow being: A publisher hooks into user and starts broadcasting created, destroyed and name_changed event. We are only interested in name change here, so we implement a subscriber syncing user name in his posts on name change and ignore the created and destroyed events. We can implement another subscriber to sync changes in comments. In future if we start denormalizing user name to someplace else, we can implement additional subscriber to handle syncing of changes.

 1 # app/pub_sub/publishers/user.rb
 2 module Publishers
 3   module User
 4     def prepare_notifications(namespace, user)
 5       if user.name_changed?
 6         user.pub_sub_notifications.add_notification(namespace, 'name_changed', changes: user.name_changes)
 7       end
 8     end
 9   end
10 end
11 
12 # app/models/user.rb
13 class User
14   include Publishers::Base
15 
16   attach_publisher('user', Publishers::User)
17 end
18 
19 class Subscribers::User::Post < Subscribers::Base
20   def name_changed(event)
21     Post.where(user_id: event.payload[:model].id).update_all(user_name: event.payload[:changes].last)
22   end
23 end
24 
25 class Subscribers::User::Comment < Subscribers::Base
26   def name_changed(event)
27     def name_changed(event)
28       iteration_limit = 100 # or fetch max comment count
29       posts = Post.all.where(:comments.matches => {user_id: event.payload[:model].id, :user_name.ne => event.payload[:changes].last})
30       while iteration_limit > 0 && posts.count > 0
31         Post.collection.update(posts.selector, {'$set' => {'comments.$.user_name' => event.payload[:changes].last} }, multi: true, safe: true)
32       end
33     end
34   end
35 end
36 
37 # config/initializers/subscribers.rb
38 Subscribers::User::Post.attach_to('user')
39 Subscribers::User::Comment.attach_to('user')

This is a lot more code, but this system seems robust:

  • Easy to add new denormalization points
  • Complete control of how to handle the change
  • A lot more modular system
  • Easily take out some subscriber or publisher without affecting the system
  • Individual sub-components can be enabled, disabled and changed independently

Conclusion: Adding some more ruby goodness to the mix, we have built a production grade iron to straighten out the callback spaghetti we have been cooking recently.

Handling of events like user milestone reached, batchmate signed up

Problem: A user signs up, this can potentially trigger a user milestone and would most probably trigger batchmate signed up notification. Where do we keep this code? In the service concerned with creation of user? In model callback? Nothing seems to fit the bill.

Solution: Lucky for us, we already have our PUBSUB. We can listen to user.created event from our user publisher and trigger appropriate behaviour in a subscriber. Sending welcome mail and batchmate activity is not core to the application and should not effect core if SMTP is not working at the moment. In case you didn't notice I secretly switched out the milestone notification with welcome email and left it as an exercise for you. Do share your thoughts on implementation in comments below.

 1 # app/services/registration/user_notifications_service.rb
 2 
 3 # handles delivery and unbundling of notification
 4 class Registration::UserNotificationsService
 5   def self.deliver_welcome_email(user)
 6     RegistrationMailer.welcome(user).deliver
 7   end
 8 
 9   def self.prepare_batchmate_notification(batchmate)
10     User.batchmates_of(batchmate).each do |user|
11       Publishers::Registration.broadcast_event('deliver_batchmate_signed_up', batchmate: batchmate, user: user)
12     end
13   end
14 
15   def self.deliver_batchmate_signed_up_email(batchmate, user)
16     RegistrationMailer.batchmate_signed_up(user, batchmate).deliver
17   end
18 end

Notification service has the logic to handle deliveries and unbundling batchmate activity notification to correct recipients. We should probably delay these actions, we do not want to affect the users experience because we need to send him a welcome email or we need to let his batchmates know about his signup. All these things could be done in a background worker.

 1 # app/jobs/welcome_email_job.rb
 2 # handled delaying of welcome email
 3 class WelcomeEmailJob
 4   def initialize(user)
 5     @user = user
 6   end
 7 
 8   def perform
 9     Registration::UserNotificationsService.deliver_welcome_email(user)
10   end
11 
12   def self.deliver(user)
13     Delayed::Job.enqueue(new(user))
14   end
15 end
16 
17 # app/jobs/batchmate_activity_job.rb
18 # handled delaying of batchmate activity notification
19 class BatchmateActivityJob
20   def initialize(batchmate, user=nil)
21     @batchmate = @batchmate
22     @user = user
23   end
24 
25   def perform
26     if bundled?
27       Registration::UserNotificationsService.prepare_batchmate_notification(@user, @batchmate)
28     else
29       Registration::UserNotificationsService.deliver_batchmate_signed_up_email(@user, @batchmate)
30     end
31   end
32 
33   def bundled?
34     @user.nil?
35   end
36 
37   def self.prepare(batchmate)
38     Delayed::Job.enqueue(new(batchmate))
39   end
40 
41   def self.deliver(batchmate, user)
42     Delayed::Job.enqueue(new(batchmate, user))
43   end
44 end

We have wrapped the service methods in two jobs, one handling batchmate notification and one handling welcome email. These jobs present interface similar as the underlying notification service. Lets just implement the subscribers to wire everything up.

 1 # app/pub_sub/subscribers/registration/user.rb
 2 class Subscribers::Registration::User < Subscribers::Base
 3   def created(event)
 4     WelcomeEmailJob.deliver(user)
 5     BatchmateActivityJob.prepare(user)
 6   end
 7 end
 8 
 9 module Subscribers
10   class RegistrationMailer < ::Subscribers::Base
11     def deliver_batchmate_signed_up(event)
12       BatchmateActivityJob.deliver(event.payload[:batchmate], event.payload[:user])
13     end
14   end
15 end
16 
17 # config/initializers/subscribers.rb
18 Subscribers::Registration::User.attach_to('user')
19 Subscribers::RegistrationMailer.attach_to('registration')

Conclusion: We have built a service which can be consumed as an API in internal project. We have built two jobs wrapping the API to be delayed to background workers. Finally wired everything up using subscribers.

Concluding thoughts

A simple pattern like pub/sub can help us write highly decoupled modules. These loosely coupled modules can be composed together in variety of ways to create a flexible, robust and scalable application. These modules can be upgraded without affecting the system if they adhere to a contract of event names and payload data.

If you liked today's article, keep looking for the next one. If you like the implementation and would love to help in gemifying it, please get in touch at rubish[dot]gupta[at]almaconnect[dot]com or tech[dot]team[at]almaconnect[dot]com

Update (04 Apr, 2014)

I will post the queries ann feedback we have received till now. If anybody has something to add, please leave a comment or mail us. Thanks to Kathy Onu and Calinoiu Alexandru Nicolae for helping with grammar and spellings. Shaomeng Zhang I have added a comment to the question.

Query from Andrey Koleshko:

I've read you article and I really liked it. In general your approach is awesome. But as I see you provide examples with Rails callbacks.

Ideally in tests we have to prevent the execution of callbacks everywhere except one place - this is place where we want to test only publishers' triggers. But as I know Rails doesn't provide some adequate facility to turn on/off callbacks. Any advises to solve the problem?

Reply:

Frankly speaking, our code base is very much untested and test suite is as good as no tests at all, so I would not be able suggest anything concrete in this area. We had the current implementation in place for a few weeks before the article, but it was for very specific use cases. As we realized the potential applications of PUB/SUB in rails, I got excited and wrote an article to get some feedback from the community. It is working great for us, but in no way it is the final stable structure.

I am not sure how to enable/disable callbacks for rails. However, for publishers if you implement the publisher as a base class instead of module (similar to subscriber in Mail Delivery Coupling), you should be able to implement a class attribute for enabling/disabling individual or all publishers.

Query from Seif:

Thanks for sharing this awesome post. I just have one question. Does it work in the same session or on a different thread. For example, if the subscription method take 10 seconds to complete, will it affect response time?

Reply:

Yes this will work synchronously. If a subscription takes 10 secs to complete, it will affect the response time for the request. However, you can initiate a background task in subscriber instead of actually doing work. Queue for ASN can also be changed. You can look into different options(redis is first thing that comes to my mind) or implement a ASN compatible queue yourself to get the desired behavior.

Followup from Seif:

Thanks for the response. About background jobs, I’m using Sidekiq, but its kinda tedious to create a worker for every listener. Any recommended pub-sub that supports threading?

Reply:

My suggestion would be create the workers, modular code is always good in long run. If you are using sidekiq you already have redis in your tech stack. You can implement a subscriber to listen to any event which should be backgrounded and put it in a redis queue. You can have other process(redis listener) to pop events from redis and trigger them in threads. Additional subscribers can be created to handle backgrounded events and can be attached in the redis listener.