gitfoss | 7dd58d39e1ef0cda0ae169a34c68def7f7f2961f | packages/gitfoss-ci-runner/src/event_bus.cr ∙ GitFOSS
.cr
Crystal
(text/x-crystal)
# Event bus with graceful shutdown, wildcard topic matching
# serialization helpers, and error handling
require "json"
require "mutex"

# Payload type alias
alias Payload = JSON::Any

struct Event
  getter topic : String
  getter payload : Payload
  getter timestamp : Time = Time.local

  def initialize(topic : String, payload : Payload)
    @topic = topic
    @payload = payload
    @timestamp = Time.local
  end

  # Serialization helper -> JSON string
  def to_json : String
    JSON.build do |j|
      j.object do
        j.field "topic", topic
        j.field "payload", payload_to_json(payload)
        j.field "timestamp", timestamp.to_unix.to_i
      end
    end
  end

  # Parse from JSON string (expects same shape)
  def self.from_json(json_str : String) : Event
    parsed = JSON.parse(json_str).as_h
    topic = parsed["topic"].as_s
    payload = json_to_payload(parsed["payload"])
    Event.new(topic, payload)
  end

  private def payload_to_json(p)
    case p
    when String, Int32, Float32, Bool, Nil
      p
    when JSON::Any
      p
    when Array
      p.as_a.map(&.payload_to_json)
    when Hash
      h = {} of String => JSON::Any
      p.as_h.each { |k, v| h[k] = payload_to_json(v) }
      h
    else
      raise "Unsupported payload type: #{p.class}"
    end
  end

  private def self.json_to_payload(j) : Payload
    j
    # JSON::Any -> convert to allowed Payload structures
    # case j
    # when JSON::Any
    #   case j
    #   when j.is_a?(String) then j.as_s
    #   when j.is_a?(Int) then j.as_i
    #   when j.is_a?(Float) then j.as_f32
    #   when j.is_a?(Bool) then j.as_bool
    #   when j.is_a?(Nil) then nil
    #   when j.is_a?(Array)
    #     j.as_a.map { |el| json_to_payload(el).as(JSON::Any) }
    #   when j.is_a?(Hash)
    #     res = {} of String => Payload
    #     j.as_h.each { |k, v| res[k] = json_to_payload(v).as(JSON::Any) }
    #     res
    #   else
    #     raise "Unsupported JSON::Any variant"
    #   end
    # else
    #   # primitive Crystal types (if already converted)
    #   j
    # end
  end
end

# Simple wildcard matcher:
# - "*" matches any single topic exactly "*"
# Pattern matchers:
# - "#" as multi-level wildcard (matches any suffix, e.g., "user.#" matches "user.login", "user.signup.confirm")
# - "+" as single-level wildcard (matches exactly one token between dots, e.g., "user.+.created")
module TopicMatcher
  def self.matches?(pattern : String, topic : String) : Bool
    return true if pattern == "*"
    pat_tokens = pattern.split(".")
    t_tokens = topic.split(".")

    i = 0
    j = 0
    while i < pat_tokens.size && j < t_tokens.size
      pt = pat_tokens[i]
      if pt == "#"
        return true
      elsif pt == "+"
        i += 1
        j += 1
        next
      elsif pt == t_tokens[j]
        i += 1
        j += 1
        next
      else
        return false
      end
    end

    # consume remaining pattern tokens: if only trailing "#" remain, match
    while i < pat_tokens.size
      return true if pat_tokens[i] == "#"
      break
    end

    i == pat_tokens.size && j == t_tokens.size
  end
end

class Subscriber
  getter patterns : Array(String)
  getter ch : Channel(Event)
  getter id : Int32

  def initialize(id : Int32, patterns : Array(String), buffer = 10)
    @id = id
    @patterns = patterns
    @ch = Channel(Event).new(buffer)
  end

  def matches?(topic : String) : Bool
    @patterns.any? { |pat| TopicMatcher.matches?(pat, topic) }
  end
end

class EventBus
  @dispatcher_fiber : Fiber

  def initialize
    @inbox = Channel(Event).new
    @subs = [] of Subscriber
    @mutex = Mutex.new
    @closing = false
    @next_sub_id = 1_i32
    @dispatcher_fiber = spawn run
  end

  def publish(event : Event)
    raise "EventBus is shutting down, cannot publish" if @closing
    @inbox.send(event)
  end

  # subscribe accepts a pattern string (or array of patterns) using + and # like MQTT
  # block handler runs in a spawned fiber; errors from handler are caught and forwarded to error_handler if set
  def subscribe(patterns : String | Array(String), buffer = 10, &handler : Proc(Event, Nil))
    pattern_list = patterns.is_a?(String) ? [patterns] : patterns
    @mutex.synchronize do
      raise "EventBus is shutting down" if @closing
    end
    id = @next_sub_id
    @next_sub_id += 1
    sub = Subscriber.new(id, pattern_list, buffer)
    @mutex.synchronize { @subs << sub }

    spawn do
      begin
        loop do
          e = sub.ch.receive
          begin
            handler.call(e)
          rescue ex
            # send to configured error handler if available, else print
            if @error_handler
              begin
                @error_handler.not_nil!.call(sub, e, ex)
              rescue err2
                STDERR.puts "Subscriber error handler failed: #{err2.message}"
              end
            else
              STDERR.puts "Subscriber ##{sub.id} handler error: #{ex.message}"
            end
          end
        end
      rescue Channel::ClosedError
        # exit
      end
    end

    sub
  end

  # Allows setting a centralized error handler: Proc(Subscriber, Event, Exception)
  def set_error_handler(&block : Proc(Subscriber, Event, Exception, Nil))
    @error_handler = block
  end

  def unsubscribe(sub : Subscriber)
    @mutex.synchronize do
      @subs.delete(sub)
    end
    sub.ch.close
  end

  # Graceful shutdown:
  # - stop accepting new publishes
  # - close inbox so dispatcher drains
  # - wait for dispatcher to finish dispatching
  # - close all subscriber channels (so subscriber handler fibers exit)
  def shutdown(timeout_seconds = 5_f32)
    @mutex.synchronize do
      return if @closing
      @closing = true
    end

    begin
      @inbox.close
    rescue
    end

    # wait for dispatcher fiber to finish (simple busy-wait with timeout)
    started = Time.local
    while @dispatcher_fiber.dead? == false
      break if (Time.local - started).seconds > timeout_seconds
      sleep(0.01.seconds)
    end

    # close subscribers
    @mutex.synchronize do
      @subs.each { |s| s.ch.close rescue nil }
      @subs.clear
    end
  end

  private def run
    loop do
      e = @inbox.receive
      @mutex.synchronize do
        # dispatch to matching subscribers
        @subs.each do |sub|
          if sub.matches?(e.topic)
            # non-blocking dispatch: send in spawned fiber, ignore if closed
            spawn { sub.ch.send(e) rescue nil }
          end
        end
      end
    end
  rescue Channel::ClosedError
    # inbox closed -> exit dispatcher
  end
end

# ---------------- Example usage ----------------
bus = EventBus.new

# Set centralized error handler
bus.set_error_handler do |sub, event, ex|
  STDERR.puts "Error in subscriber ##{sub.id} handling #{event.topic}: #{ex.class} - #{ex.message}"
end

# Subscribe with MQTT-style patterns
all = bus.subscribe("*") do |e|
  puts "[*] #{e.topic}: #{e.payload.inspect}"
end
user_any = bus.subscribe("user.#") do |e|
  puts "[user.#] #{e.topic}: #{e.payload.inspect}"
end
login_specific = bus.subscribe("user.login") do |e|
  puts "[user.login] #{e.payload.inspect}"
end
single_level = bus.subscribe("order.+.created") do |e|
  puts "[order.+.created] got #{e.topic}"
end

# Publish a variety of events (using typed payloads)
bus.publish Event.new("user.signup", JSON.parse(%({"email": "alice@example.com"})))
bus.publish Event.new("user.signup.confirm", JSON.parse(%({"email": "alice@example.com"})))
bus.publish Event.new("user.login", JSON.parse(%({"id": 123, "method": "oauth"})))
bus.publish Event.new("order.us.created", JSON.parse(%({"order_id": "o-1", "total": 42})))
bus.publish Event.new("order.eu.created", JSON.parse(%({"order_id": "o-2", "total": 55})))

# Demonstrate serialization helpers
# e = Event.new("meta", JSON.parse(%({"k":"v","n":1})))
# json_str = e.to_json
# puts "Serialized event: #{json_str}"
# parsed = Event.from_json(json_str)
# puts "Parsed event topic: #{parsed.topic}, payload: #{parsed.payload.inspect}"

# Trigger an error in a handler to show central error handling
bus.subscribe("bad.handler") do |ev|
  raise "boom for #{ev.topic}"
end

bus.publish Event.new("bad.handler", JSON.parse(%(null)))

sleep(0.1.seconds)

# Graceful shutdown
bus.shutdown(2.0)
puts "Shutdown complete."