require "json"
require "mutex"
alias Payload = JSON::Any
struct Event
getter topic : String
getter payload : Payload
getter timestamp : Time = Time.local
def initialize(topic : String, payload : Payload)
@topic = topic
@payload = payload
@timestamp = Time.local
end
def to_json : String
JSON.build do |j|
j.object do
j.field "topic", topic
j.field "payload", payload_to_json(payload)
j.field "timestamp", timestamp.to_unix.to_i
end
end
end
def self.from_json(json_str : String) : Event
parsed = JSON.parse(json_str).as_h
topic = parsed["topic"].as_s
payload = json_to_payload(parsed["payload"])
Event.new(topic, payload)
end
private def payload_to_json(p)
case p
when String, Int32, Float32, Bool, Nil
p
when JSON::Any
p
when Array
p.as_a.map(&.payload_to_json)
when Hash
h = {} of String => JSON::Any
p.as_h.each { |k, v| h[k] = payload_to_json(v) }
h
else
raise "Unsupported payload type: #{p.class}"
end
end
private def self.json_to_payload(j) : Payload
j
end
end
module TopicMatcher
def self.matches?(pattern : String, topic : String) : Bool
return true if pattern == "*"
pat_tokens = pattern.split(".")
t_tokens = topic.split(".")
i = 0
j = 0
while i < pat_tokens.size && j < t_tokens.size
pt = pat_tokens[i]
if pt == "#"
return true
elsif pt == "+"
i += 1
j += 1
next
elsif pt == t_tokens[j]
i += 1
j += 1
next
else
return false
end
end
while i < pat_tokens.size
return true if pat_tokens[i] == "#"
break
end
i == pat_tokens.size && j == t_tokens.size
end
end
class Subscriber
getter patterns : Array(String)
getter ch : Channel(Event)
getter id : Int32
def initialize(id : Int32, patterns : Array(String), buffer = 10)
@id = id
@patterns = patterns
@ch = Channel(Event).new(buffer)
end
def matches?(topic : String) : Bool
@patterns.any? { |pat| TopicMatcher.matches?(pat, topic) }
end
end
class EventBus
@dispatcher_fiber : Fiber
def initialize
@inbox = Channel(Event).new
@subs = [] of Subscriber
@mutex = Mutex.new
@closing = false
@next_sub_id = 1_i32
@dispatcher_fiber = spawn run
end
def publish(event : Event)
raise "EventBus is shutting down, cannot publish" if @closing
@inbox.send(event)
end
def subscribe(patterns : String | Array(String), buffer = 10, &handler : Proc(Event, Nil))
pattern_list = patterns.is_a?(String) ? [patterns] : patterns
@mutex.synchronize do
raise "EventBus is shutting down" if @closing
end
id = @next_sub_id
@next_sub_id += 1
sub = Subscriber.new(id, pattern_list, buffer)
@mutex.synchronize { @subs << sub }
spawn do
begin
loop do
e = sub.ch.receive
begin
handler.call(e)
rescue ex
if @error_handler
begin
@error_handler.not_nil!.call(sub, e, ex)
rescue err2
STDERR.puts "Subscriber error handler failed: #{err2.message}"
end
else
STDERR.puts "Subscriber ##{sub.id} handler error: #{ex.message}"
end
end
end
rescue Channel::ClosedError
end
end
sub
end
def set_error_handler(&block : Proc(Subscriber, Event, Exception, Nil))
@error_handler = block
end
def unsubscribe(sub : Subscriber)
@mutex.synchronize do
@subs.delete(sub)
end
sub.ch.close
end
def shutdown(timeout_seconds = 5_f32)
@mutex.synchronize do
return if @closing
@closing = true
end
begin
@inbox.close
rescue
end
started = Time.local
while @dispatcher_fiber.dead? == false
break if (Time.local - started).seconds > timeout_seconds
sleep(0.01.seconds)
end
@mutex.synchronize do
@subs.each { |s| s.ch.close rescue nil }
@subs.clear
end
end
private def run
loop do
e = @inbox.receive
@mutex.synchronize do
@subs.each do |sub|
if sub.matches?(e.topic)
spawn { sub.ch.send(e) rescue nil }
end
end
end
end
rescue Channel::ClosedError
end
end
bus = EventBus.new
bus.set_error_handler do |sub, event, ex|
STDERR.puts "Error in subscriber ##{sub.id} handling #{event.topic}: #{ex.class} - #{ex.message}"
end
all = bus.subscribe("*") do |e|
puts "[*] #{e.topic}: #{e.payload.inspect}"
end
user_any = bus.subscribe("user.#") do |e|
puts "[user.#] #{e.topic}: #{e.payload.inspect}"
end
login_specific = bus.subscribe("user.login") do |e|
puts "[user.login] #{e.payload.inspect}"
end
single_level = bus.subscribe("order.+.created") do |e|
puts "[order.+.created] got #{e.topic}"
end
bus.publish Event.new("user.signup", JSON.parse(%({"email": "alice@example.com"})))
bus.publish Event.new("user.signup.confirm", JSON.parse(%({"email": "alice@example.com"})))
bus.publish Event.new("user.login", JSON.parse(%({"id": 123, "method": "oauth"})))
bus.publish Event.new("order.us.created", JSON.parse(%({"order_id": "o-1", "total": 42})))
bus.publish Event.new("order.eu.created", JSON.parse(%({"order_id": "o-2", "total": 55})))
bus.subscribe("bad.handler") do |ev|
raise "boom for #{ev.topic}"
end
bus.publish Event.new("bad.handler", JSON.parse(%(null)))
sleep(0.1.seconds)
bus.shutdown(2.0)
puts "Shutdown complete."
.cr
Crystal
(text/x-crystal)
require "json"
require "mutex"
alias Payload = JSON::Any
struct Event
getter topic : String
getter payload : Payload
getter timestamp : Time = Time.local
def initialize(topic : String, payload : Payload)
@topic = topic
@payload = payload
@timestamp = Time.local
end
def to_json : String
JSON.build do |j|
j.object do
j.field "topic", topic
j.field "payload", payload_to_json(payload)
j.field "timestamp", timestamp.to_unix.to_i
end
end
end
def self.from_json(json_str : String) : Event
parsed = JSON.parse(json_str).as_h
topic = parsed["topic"].as_s
payload = json_to_payload(parsed["payload"])
Event.new(topic, payload)
end
private def payload_to_json(p)
case p
when String, Int32, Float32, Bool, Nil
p
when JSON::Any
p
when Array
p.as_a.map(&.payload_to_json)
when Hash
h = {} of String => JSON::Any
p.as_h.each { |k, v| h[k] = payload_to_json(v) }
h
else
raise "Unsupported payload type: #{p.class}"
end
end
private def self.json_to_payload(j) : Payload
j
end
end
module TopicMatcher
def self.matches?(pattern : String, topic : String) : Bool
return true if pattern == "*"
pat_tokens = pattern.split(".")
t_tokens = topic.split(".")
i = 0
j = 0
while i < pat_tokens.size && j < t_tokens.size
pt = pat_tokens[i]
if pt == "#"
return true
elsif pt == "+"
i += 1
j += 1
next
elsif pt == t_tokens[j]
i += 1
j += 1
next
else
return false
end
end
while i < pat_tokens.size
return true if pat_tokens[i] == "#"
break
end
i == pat_tokens.size && j == t_tokens.size
end
end
class Subscriber
getter patterns : Array(String)
getter ch : Channel(Event)
getter id : Int32
def initialize(id : Int32, patterns : Array(String), buffer = 10)
@id = id
@patterns = patterns
@ch = Channel(Event).new(buffer)
end
def matches?(topic : String) : Bool
@patterns.any? { |pat| TopicMatcher.matches?(pat, topic) }
end
end
class EventBus
@dispatcher_fiber : Fiber
def initialize
@inbox = Channel(Event).new
@subs = [] of Subscriber
@mutex = Mutex.new
@closing = false
@next_sub_id = 1_i32
@dispatcher_fiber = spawn run
end
def publish(event : Event)
raise "EventBus is shutting down, cannot publish" if @closing
@inbox.send(event)
end
def subscribe(patterns : String | Array(String), buffer = 10, &handler : Proc(Event, Nil))
pattern_list = patterns.is_a?(String) ? [patterns] : patterns
@mutex.synchronize do
raise "EventBus is shutting down" if @closing
end
id = @next_sub_id
@next_sub_id += 1
sub = Subscriber.new(id, pattern_list, buffer)
@mutex.synchronize { @subs << sub }
spawn do
begin
loop do
e = sub.ch.receive
begin
handler.call(e)
rescue ex
if @error_handler
begin
@error_handler.not_nil!.call(sub, e, ex)
rescue err2
STDERR.puts "Subscriber error handler failed: #{err2.message}"
end
else
STDERR.puts "Subscriber ##{sub.id} handler error: #{ex.message}"
end
end
end
rescue Channel::ClosedError
end
end
sub
end
def set_error_handler(&block : Proc(Subscriber, Event, Exception, Nil))
@error_handler = block
end
def unsubscribe(sub : Subscriber)
@mutex.synchronize do
@subs.delete(sub)
end
sub.ch.close
end
def shutdown(timeout_seconds = 5_f32)
@mutex.synchronize do
return if @closing
@closing = true
end
begin
@inbox.close
rescue
end
started = Time.local
while @dispatcher_fiber.dead? == false
break if (Time.local - started).seconds > timeout_seconds
sleep(0.01.seconds)
end
@mutex.synchronize do
@subs.each { |s| s.ch.close rescue nil }
@subs.clear
end
end
private def run
loop do
e = @inbox.receive
@mutex.synchronize do
@subs.each do |sub|
if sub.matches?(e.topic)
spawn { sub.ch.send(e) rescue nil }
end
end
end
end
rescue Channel::ClosedError
end
end
bus = EventBus.new
bus.set_error_handler do |sub, event, ex|
STDERR.puts "Error in subscriber ##{sub.id} handling #{event.topic}: #{ex.class} - #{ex.message}"
end
all = bus.subscribe("*") do |e|
puts "[*] #{e.topic}: #{e.payload.inspect}"
end
user_any = bus.subscribe("user.#") do |e|
puts "[user.#] #{e.topic}: #{e.payload.inspect}"
end
login_specific = bus.subscribe("user.login") do |e|
puts "[user.login] #{e.payload.inspect}"
end
single_level = bus.subscribe("order.+.created") do |e|
puts "[order.+.created] got #{e.topic}"
end
bus.publish Event.new("user.signup", JSON.parse(%({"email": "alice@example.com"})))
bus.publish Event.new("user.signup.confirm", JSON.parse(%({"email": "alice@example.com"})))
bus.publish Event.new("user.login", JSON.parse(%({"id": 123, "method": "oauth"})))
bus.publish Event.new("order.us.created", JSON.parse(%({"order_id": "o-1", "total": 42})))
bus.publish Event.new("order.eu.created", JSON.parse(%({"order_id": "o-2", "total": 55})))
bus.subscribe("bad.handler") do |ev|
raise "boom for #{ev.topic}"
end
bus.publish Event.new("bad.handler", JSON.parse(%(null)))
sleep(0.1.seconds)
bus.shutdown(2.0)
puts "Shutdown complete."