This article is about implementing a simple publisher/subscriber model in Rails using ActiveSupport::Notifications (ASN). Exploring problem areas where PUBSUB can improve modularity and reduce coupling. We would also upgrade our basic implementation to cater to common use cases.
- A little about PUB/SUB?
- Implementing basic publisher & subscriber; lets get it on
- Use cases where PUB/SUB can be helpful
- Concluding thoughts
- Update (04 Apr, 2014)
A little about PUB/SUB?
According to wikipedia is:
In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers. Instead, published messages are characterized into classes, without knowledge of what, if any, subscribers there may be. Similarly, subscribers express interest in one or more classes, and only receive messages that are of interest, without knowledge of what, if any, publishers there are.
Rails is a good framework for proof-of-concept and small applications, it promotes conventions over configuration. As the application grows it becomes difficult to maintain. This is primarily because rails promotes coupeled architecture with fat model, skinny controller approach. Developers need to adopt to new patterns and create new conventions to keep projects moving on pace and in shape. PUBSUB can help us resolve some of the problems we face often.
Implementing basic publisher & subscriber; lets get it on
Basic requirements of our PUBSUB are:
- Publishers should be able to publish named events with payload
- Subscribers should be able to subscribe to events matching a name and should receive the payload object with every event
Such a system is already bundled with rails: its ASN. ASN is used through out rails internally for instrumentation. The basic implementation of ASN is such a simple yet powerful API that we can start our PUBSUB implementation on top of it.
Publisher
1 # app/pub_sub/publisher.rb
2 module Publisher
3 extend self
4
5 # delegate to ActiveSupport::Notifications.instrument
6 def broadcast_event(event_name, payload={})
7 if block_given?
8 ActiveSupport::Notifications.instrument(event_name, payload) do
9 yield
10 end
11 else
12 ActiveSupport::Notifications.instrument(event_name, payload)
13 end
14 end
15 end
This is as simple as it gets. Publisher can broadcast an event with a payload. An optional block can be passed. If a block is passed, event published will also contain information about execution time and exception if any. All additional goodness associated with blocks is provided by ASN used as underlying implementation, interesting!!).
1 if user.save
2 # publish event 'user.created', with payload {user: user}
3 Publisher.broadcast_event('user.created', user: user)
4 end
5
6 def create_user(params)
7 user = User.new(params)
8
9 # publish event 'user.created', with payload {user: user}, using block syntax
10 # now the event will have additional data about duration and exceptions
11 Publisher.broadcast_event('user.created', user: user) do
12 User.save!
13 # do some more important stuff here
14 end
15 end
Subscriber
1 # app/pub_sub/subscriber.rb
2 module Subscriber
3 # delegate to ActiveSupport::Notifications.subscribe
4 def self.subscribe(event_name)
5 if block_given?
6 ActiveSupport::Notifications.subscribe(event_name) do |*args|
7 event = ActiveSupport::Notifications::Event.new(*args)
8 yield(event)
9 end
10 end
11 end
12 end
Subscriber can be used to subscribe to a named event and passed block will receive an instance of ActiveSupport::Notifications::Event as parameter.
1 # subscriber example usage
2 Subscriber.subscribe('user.created') do |event|
3 error = "Error: #{event.payload[:exception].first}" if event.payload[:exception]
4 puts "#{event.transaction_id} | #{event.name} | #{event.time} | #{event.duration} | #{event.payload[:user].id} | #{error}"
5 end
Use cases where PUB/SUB can be helpful
These are some problems we face in our routine work, which can be solved by PUBSUB:
- Mail deliveries coupled with code
- Messy callback chain introduced due to denormalization
- Handling of events like user milestone reached, batchmate signed up
Mail deliveries coupled with code
Problem: There are always some kind of notifications at work in any user centric application. Generally it starts with after_save
model callback and slowly moves out of there to a services layer. Lets explore, how can we handle welcome emails using PUBSUB.
Solution: Since we are adding this pattern to our application, lets bake in the modularity in core usage, for the time being a simple string namespace should be good enough.
1 # app/pub_sub/publisher.rb
2 module Publisher
3 extend ::ActiveSupport::Concern
4 extend self
5
6 included do
7 # add support for namespace, one class - one namespace
8 class_attribute :pub_sub_namespace
9
10 self.pub_sub_namespace = nil
11 end
12
13 # delegate to class method
14 def broadcast_event(event_name, payload={})
15 if block_given?
16 self.class.broadcast_event(event_name, payload) do
17 yield
18 end
19 else
20 self.class.broadcast_event(event_name, payload)
21 end
22 end
23
24
25 module ClassMethods
26 # delegate to ASN
27 def broadcast_event(event_name, payload={})
28 event_name = [pub_sub_namespace, event_name].compact.join('.')
29 if block_given?
30 ActiveSupport::Notifications.instrument(event_name, payload) do
31 yield
32 end
33 else
34 ActiveSupport::Notifications.instrument(event_name, payload)
35 end
36 end
37 end
38
39 end
The publisher under went some changes. It can be included in a class and after setting a namespace, you are all set to publish events in that namespace. Lets start broadcasting event user_signed_up
in registration
namespace.
1 # app/pub_sub/publishers/registration.rb
2 module Publishers
3 class Registration
4 include Publisher
5
6 self.pub_sub_namespace = 'registration'
7 end
8 end
9
10 # broadcast event
11 if user.save
12 Publishers::Registration.broadcast_event('user_signed_up', user: user)
13 end
Publisher seems to be good, lets add some ruby goodness to subscribers too.
1 # app/pub_sub/subscribers/base.rb
2 module Subscribers
3 class Base
4 class_attribute :subscriptions_enabled
5 attr_reader :namespace
6
7 def initialize(namespace)
8 @namespace = namespace
9 end
10
11 # attach public methods of subscriber with events in the namespace
12 def self.attach_to(namespace)
13 log_subscriber = new(namespace)
14 log_subscriber.public_methods(false).each do |event|
15 ActiveSupport::Notifications.subscribe("#{namespace}.#{event}", log_subscriber)
16 end
17 end
18
19 # trigger methods when an even is captured
20 def call(message, *args)
21 method = message.gsub("#{namespace}.", '')
22 handler = self.class.new(namespace)
23 handler.send(method, ActiveSupport::Notifications::Event.new(message, *args))
24 end
25 end
26 end
We have created a base class which subscribers can extend. Base class contains the magic to map events in a namespace to methods in the subscriber. Lets subscribe to the event we have been broadcasting and send the welcome email.
1 # app/pub_sub/subscribers/registration_mailer.rb
2 module Subscribers
3 class RegistrationMailer < ::Subscribers::Base
4 def user_signed_up(event)
5 # lets delay the delivery using delayed_job
6 RegistrationMailer.delay(priority: 1).welcome_email(event.payload[:user])
7 end
8 end
9 end
10
11 # config/initializers/subscribers.rb
12 Subscribers::RegistrationMailer.attach_to('registration')
This looks good, signup process just broadcasts an event about user signup and carries on with its tasks. Modules which are not core to the application can then hook onto these events to attach extended functionality.
Building modular, loosely coupled system with message publishing has its own cons. Biggest being, the system is too loosely coupled. If something stops working no errors start cropping up and it might take too long to identify it. However I believe that failures can happen anywhere and a system which fails with grace is more robust that one which fails completely. Its another layer which gets introduced while building scalable systems. Base Line: for me pros out weigh the cons
Conclusion: With little modifications, our PUBSUB now supports namespaces (read features, modules, sub-systems, opportunities!!). Adding a layer on top of already useful ActiveSupport::Notification
already feels good enough.
Messy callback chain introduced due to denormalization
Problem: We use mongodb via mongoid and denormalize data to cut down on queries. Initially it was some callbacks and method overrides, which grew out to be modules and then a simple re-usable denormalization plugin. Plugin fall backs to querying data from related model and also cache it for future use. This works like a cache fetch, calculate, store strategy with remote document as origin and current document as cache store. The code looks something like:
1 # app/models/user.rb
2 class User
3 include AlmaConnect::Denormalization
4 field :name, type: String
5 has_many :posts
6
7 # this sets up a callback for when name changes,
8 # it propagates changes to posts where user_id matches the user
9 denormalize_to :posts, fields: [:name]
10 end
11
12 # app/models/post.rb
13 class Post
14 include AlmaConnect::Denormalization
15 belongs_to :user
16
17 # this sets up a callback to update user_name when user_id changes
18 # also fetch and set, user name from user if it is not already set, when accessed
19 denormalize_from :user, fields: [:name]
20 end
It has been working out very nicely, but it results in very coupled code. We define denormalization macros in the models. This makes all our models aware of what data they are denormalizing, where from and also what data they need to denormalize, where to. What if we need to denormalize user name to comment as well?
1 # app/models/comment.rb
2 class Comment
3 include AlmaConnect::Denormalization
4 embedded_in :post
5 belongs_to :user, inverse_of: nil
6 denormalize_from :user, fields: [:name]
7 end
8
9 # app/models/post.rb
10 class Post
11 include AlmaConnect::Denormalization
12 belongs_to :user
13 embeds_many :comments
14 denormalize_from :user, fields: [:name]
15 end
16
17 # app/models/user.rb
18 class User
19 include AlmaConnect::Denormalization
20 field :name, type: String
21 has_many :posts
22 denormalize_to :posts, fields: [:name]
23
24 # we don't define the relation here, still define the denormalization macro
25 denormalize_to 'post.comments', fields: [:name]
26
27 # may be many more denormalize_to calls here if comment has a polymorphic parent
28 end
You see the problem, right? User need not have knowledge about comment, but needed it there to sync changes whenever user name changes.
Solution: Instead of sprinkling denormalization macros all around, we can leave the from macros as is, the fallback logic is very handy. We can replace to macros, with publishing change events. We would simply publish a message whenever a user name changes. We would capture the change in a before_save
callback and publish it in after_save
. By publishing in after_save
we ensure that user is persisted at time of publishing. We can change the publisher to hook directly into model callback chain. Lets call the change events: notifications, because they are not wrappers around code blocks. They are simply notifications that "something changed". Events can be described as: "something happened and it took this much time to do it, oh sorry! it was just attempted, we got this error and it was not completed."
Lets create a publisher module, which can be included in any active model complaint model, supporting callbacks. Model can attach a namespace to itself and assign a publisher to the namespace. Attached publisher should immediately start broadcasting important notifications like created and destroyed. To broadcast additional notifications, publisher should be able to hook in.
1 # app/pub_sub/publishers/base.rb
2 # hook into model callback chain when a publisher is attached to a model
3 # capture changes before_save and publish in after_save
4 # start publishing created and destroyed notification
5 module Publishers
6 module Base
7 extend ActiveSupport::Concern
8
9 # inject a reader to handle notifications for model in namespaces
10 def pub_sub_notifications
11 @pub_sub_notifications ||= ::Publishers::PubSubNotifications.new(self)
12 end
13
14 module ClassMethods
15 def attach_publisher(namespace, publisher_class)
16 # attach publisher to model class
17 after_initialize do |model|
18 model.pub_sub_notifications.attach_publisher(namespace, publisher_class)
19 end
20
21 # emit created notification and let he publisher hook in to the notifications
22 before_save do |model|
23 model.pub_sub_notifications.prepare_created(namespace)
24 model.pub_sub_notifications.prepare_notifications(namespace)
25 end
26
27 # publish notifications after save
28 after_save do |model|
29 model.pub_sub_notifications.publish_notifications(namespace)
30 end
31
32 # emit destroy notification and publish notifications
33 after_destroy do |model|
34 model.pub_sub_notifications.prepare_destroyed(namespace)
35 model.pub_sub_notifications.publish_notifications(namespace)
36 end
37 end
38 end
39
40 end
41 end
42
43 # app/pub_sub/publishers/notifications_queue.rb
44 # capture attachment of a publisher to a model in a namespace
45 module Publishers
46 class NotificationsQueue
47 attr_reader :publisher, :notifications
48
49 def initialize(publisher_name)
50 @publisher = publisher_name.to_s.constantize.new
51 @notifications = []
52 end
53
54 def add_notification(event_name, payload={})
55 @notifications << {event_name: event_name, payload: payload}
56 end
57
58 def reset_notifications
59 @notifications = []
60 end
61 end
62 end
63
64 # app/pub_sub/publishers/pub_sub_notifications.rb
65 # handle different attachments of publishers to a model
66 module Publishers
67 class PubSubNotifications
68 include ::Publisher
69
70 attr_reader :publishers_info, :model
71
72 def initialize(model)
73 @publishers_info = {}
74 @model = model
75 end
76
77 def attach_publisher(namespace, publisher_name)
78 publishers_info[namespace] ||= Publishers::NotificationsQueue.new(publisher_name)
79 true
80 end
81
82 def reset_notifications(namespace)
83 publishers_info[namespace].reset_notifications
84 end
85
86 def add_notification(namespace, event_name, payload={})
87 publishers_info[namespace].add_notification(event_name, payload)
88 end
89
90 def prepare_created(namespace)
91 add_notification(namespace, 'created') if model.new_record?
92 end
93
94 def prepare_destroyed(namespace)
95 add_notification(namespace, 'destroyed') if model.destroyed?
96 end
97
98 def prepare_notifications(namespace)
99 publishers_info[namespace].publisher.prepare_notifications(namespace, model)
100
101 return true
102 end
103
104 def publish_notifications(namespace)
105 publishers_info[namespace].notifications.each do |notification|
106 broadcast_event(
107 [namespace, notification[:event_name]].compact.join('.'),
108 notification[:payload].merge(model: model)
109 )
110 end
111
112 return true
113 end
114
115 end
116 end
The code seems a little difficult to follow, bet lets postpone the re-factoring for later.
Lets try to implement the denormalize_to for user name. Basic flow being: A publisher hooks into user and starts broadcasting created, destroyed and name_changed event. We are only interested in name change here, so we implement a subscriber syncing user name in his posts on name change and ignore the created and destroyed events. We can implement another subscriber to sync changes in comments. In future if we start denormalizing user name to someplace else, we can implement additional subscriber to handle syncing of changes.
1 # app/pub_sub/publishers/user.rb
2 module Publishers
3 module User
4 def prepare_notifications(namespace, user)
5 if user.name_changed?
6 user.pub_sub_notifications.add_notification(namespace, 'name_changed', changes: user.name_changes)
7 end
8 end
9 end
10 end
11
12 # app/models/user.rb
13 class User
14 include Publishers::Base
15
16 attach_publisher('user', Publishers::User)
17 end
18
19 class Subscribers::User::Post < Subscribers::Base
20 def name_changed(event)
21 Post.where(user_id: event.payload[:model].id).update_all(user_name: event.payload[:changes].last)
22 end
23 end
24
25 class Subscribers::User::Comment < Subscribers::Base
26 def name_changed(event)
27 def name_changed(event)
28 iteration_limit = 100 # or fetch max comment count
29 posts = Post.all.where(:comments.matches => {user_id: event.payload[:model].id, :user_name.ne => event.payload[:changes].last})
30 while iteration_limit > 0 && posts.count > 0
31 Post.collection.update(posts.selector, {'$set' => {'comments.$.user_name' => event.payload[:changes].last} }, multi: true, safe: true)
32 end
33 end
34 end
35 end
36
37 # config/initializers/subscribers.rb
38 Subscribers::User::Post.attach_to('user')
39 Subscribers::User::Comment.attach_to('user')
This is a lot more code, but this system seems robust:
- Easy to add new denormalization points
- Complete control of how to handle the change
- A lot more modular system
- Easily take out some subscriber or publisher without affecting the system
- Individual sub-components can be enabled, disabled and changed independently
Conclusion: Adding some more ruby goodness to the mix, we have built a production grade iron to straighten out the callback spaghetti we have been cooking recently.
Handling of events like user milestone reached, batchmate signed up
Problem: A user signs up, this can potentially trigger a user milestone and would most probably trigger batchmate signed up notification. Where do we keep this code? In the service concerned with creation of user? In model callback? Nothing seems to fit the bill.
Solution: Lucky for us, we already have our PUBSUB. We can listen to user.created
event from our user publisher and trigger appropriate behaviour in a subscriber. Sending welcome mail and batchmate activity is not core to the application and should not effect core if SMTP is not working at the moment. In case you didn't notice I secretly switched out the milestone notification with welcome email and left it as an exercise for you. Do share your thoughts on implementation in comments below.
1 # app/services/registration/user_notifications_service.rb
2
3 # handles delivery and unbundling of notification
4 class Registration::UserNotificationsService
5 def self.deliver_welcome_email(user)
6 RegistrationMailer.welcome(user).deliver
7 end
8
9 def self.prepare_batchmate_notification(batchmate)
10 User.batchmates_of(batchmate).each do |user|
11 Publishers::Registration.broadcast_event('deliver_batchmate_signed_up', batchmate: batchmate, user: user)
12 end
13 end
14
15 def self.deliver_batchmate_signed_up_email(batchmate, user)
16 RegistrationMailer.batchmate_signed_up(user, batchmate).deliver
17 end
18 end
Notification service has the logic to handle deliveries and unbundling batchmate activity notification to correct recipients. We should probably delay these actions, we do not want to affect the users experience because we need to send him a welcome email or we need to let his batchmates know about his signup. All these things could be done in a background worker.
1 # app/jobs/welcome_email_job.rb
2 # handled delaying of welcome email
3 class WelcomeEmailJob
4 def initialize(user)
5 @user = user
6 end
7
8 def perform
9 Registration::UserNotificationsService.deliver_welcome_email(user)
10 end
11
12 def self.deliver(user)
13 Delayed::Job.enqueue(new(user))
14 end
15 end
16
17 # app/jobs/batchmate_activity_job.rb
18 # handled delaying of batchmate activity notification
19 class BatchmateActivityJob
20 def initialize(batchmate, user=nil)
21 @batchmate = @batchmate
22 @user = user
23 end
24
25 def perform
26 if bundled?
27 Registration::UserNotificationsService.prepare_batchmate_notification(@user, @batchmate)
28 else
29 Registration::UserNotificationsService.deliver_batchmate_signed_up_email(@user, @batchmate)
30 end
31 end
32
33 def bundled?
34 @user.nil?
35 end
36
37 def self.prepare(batchmate)
38 Delayed::Job.enqueue(new(batchmate))
39 end
40
41 def self.deliver(batchmate, user)
42 Delayed::Job.enqueue(new(batchmate, user))
43 end
44 end
We have wrapped the service methods in two jobs, one handling batchmate notification and one handling welcome email. These jobs present interface similar as the underlying notification service. Lets just implement the subscribers to wire everything up.
1 # app/pub_sub/subscribers/registration/user.rb
2 class Subscribers::Registration::User < Subscribers::Base
3 def created(event)
4 WelcomeEmailJob.deliver(user)
5 BatchmateActivityJob.prepare(user)
6 end
7 end
8
9 module Subscribers
10 class RegistrationMailer < ::Subscribers::Base
11 def deliver_batchmate_signed_up(event)
12 BatchmateActivityJob.deliver(event.payload[:batchmate], event.payload[:user])
13 end
14 end
15 end
16
17 # config/initializers/subscribers.rb
18 Subscribers::Registration::User.attach_to('user')
19 Subscribers::RegistrationMailer.attach_to('registration')
Conclusion: We have built a service which can be consumed as an API in internal project. We have built two jobs wrapping the API to be delayed to background workers. Finally wired everything up using subscribers.
Concluding thoughts
A simple pattern like pub/sub can help us write highly decoupled modules. These loosely coupled modules can be composed together in variety of ways to create a flexible, robust and scalable application. These modules can be upgraded without affecting the system if they adhere to a contract of event names and payload data.
If you liked today's article, keep looking for the next one. If you like the implementation and would love to help in gemifying it, please get in touch at rubish[dot]gupta[at]almaconnect[dot]com
or tech[dot]team[at]almaconnect[dot]com
Update (04 Apr, 2014)
I will post the queries ann feedback we have received till now. If anybody has something to add, please leave a comment or mail us. Thanks to Kathy Onu and Calinoiu Alexandru Nicolae for helping with grammar and spellings. Shaomeng Zhang I have added a comment to the question.
Query from Andrey Koleshko:
I've read you article and I really liked it. In general your approach is awesome. But as I see you provide examples with Rails callbacks.
Ideally in tests we have to prevent the execution of callbacks everywhere except one place - this is place where we want to test only publishers' triggers. But as I know Rails doesn't provide some adequate facility to turn on/off callbacks. Any advises to solve the problem?
Reply:
Frankly speaking, our code base is very much untested and test suite is as good as no tests at all, so I would not be able suggest anything concrete in this area. We had the current implementation in place for a few weeks before the article, but it was for very specific use cases. As we realized the potential applications of PUB/SUB in rails, I got excited and wrote an article to get some feedback from the community. It is working great for us, but in no way it is the final stable structure.
I am not sure how to enable/disable callbacks for rails. However, for publishers if you implement the publisher as a base class instead of module (similar to subscriber in Mail Delivery Coupling), you should be able to implement a class attribute for enabling/disabling individual or all publishers.
Query from Seif:
Thanks for sharing this awesome post. I just have one question. Does it work in the same session or on a different thread. For example, if the subscription method take 10 seconds to complete, will it affect response time?
Reply:
Yes this will work synchronously. If a subscription takes 10 secs to complete, it will affect the response time for the request. However, you can initiate a background task in subscriber instead of actually doing work. Queue for ASN can also be changed. You can look into different options(redis is first thing that comes to my mind) or implement a ASN compatible queue yourself to get the desired behavior.
Followup from Seif:
Thanks for the response. About background jobs, I’m using Sidekiq, but its kinda tedious to create a worker for every listener. Any recommended pub-sub that supports threading?
Reply:
My suggestion would be create the workers, modular code is always good in long run. If you are using sidekiq you already have redis in your tech stack. You can implement a subscriber to listen to any event which should be backgrounded and put it in a redis queue. You can have other process(redis listener) to pop events from redis and trigger them in threads. Additional subscribers can be created to handle backgrounded events and can be attached in the redis listener.