Module cittadino

cittadino: a simple, opinionated pubsub framework

cittadino is a simple, small library that implements a pubsub system over STOMP.

It exposes a single main object type, PubSub, with its constructor, newPubSub(), which provides two main procs: publishModelEvent() and subscribe(). These procedures take care of publishing and subscribing, respectively. Subscriber procedures can be registered with subscribe() to certain AMQP/STOMP-style topic patterns, and arbitrary JSON messages can be sent to routing keys with publishModelEvent().

In order process incoming events, call run() inside of a process. That process will loop and block, waiting for incoming messages. Any topics that match the patterns that were registered with subscribe() will have their messages passed to the handler procedures.

Procs

proc newPubSub(stompUrl, modelExchangeName: string; nameSpace = ""): PubSub {.
raises: [OSError, OverflowError, StompError, ValueError], tags: []
.}

Create a new PubSub object. Given the URL of a STOMP server and the name of the exchange to listen to, will create and manage a STOMP connection, optionally suffixed with nameSpace.

The URL should be a fully qualified STOMP URI: starting with stomp:// or stomp+ssl:// including necessary user/password information, including a vhost at the end (in the examples, the default /), and finally ending with query parameters to encode connection options.

Currently the Nim STOMP library understands a single query parameter: heartbeat=<interval>, which will request a heartbeat from the STOMP server every interval seconds.

Examples:
var pubsub = newPubSub("stomp://user:user@192.168.111.222/", "model_exchange")
Examples:
var pubsub = newPubSub("stomp://user:user@192.168.111.222/?heartbeat=10", "amq_ex")
Examples:
var pubsub = newPubSub("stomp+ssl://user:user@192.168.111.222/", "my_exchange")
proc connectedCallback=(pubsub: PubSub; callback: StompCallback) {.
raises: [], tags: []
.}
Set the callback procedure to be called whenever pubsub connects to its server.
proc heartbeatCallback=(pubsub: PubSub; callback: StompCallback) {.
raises: [], tags: []
.}
Set the callback procedure to be called whenever pubsub receives its heartbeat from the STOMP server.
proc errorCallback=(pubsub: PubSub; callback: StompCallback) {.
raises: [], tags: []
.}
Set the callback procedure to be called whenever pubsub receives a error from the STOMP server.
proc subscribe(pubsub: var PubSub; topic: string; callback: PubSubCallback;
              autoDelete = false) {.
raises: [Exception, KeyError, RegexError, OSError, SslError, TimeoutError, AssertionError, OverflowError, ValueError, StompError], tags: [ RootEffect, ReadIOEffect, WriteIOEffect, TimeEffect]
.}

Register a callback procedure against a subscription pattern. callback whenever a message is received whose destination matches against topic.

Destinations are declared with durable:true, which specifies that if the broker is restarted, the queue should be retained. By default they are also created with auto-delete:false, which specifies that the queue should not be destroyed if there is no active consumer (ie, published messages will still be routed to the queue and consumed when run() is called again). This can be toggled with the autoDelete parameter.

Examples:
proc handler(json: JsonNode) =
  echo "Got a new message!"

var pubsub = newPubSub("stomp://user:user@192.168.111.222/", "model_exchange")
pubsub.subscribe("user.*", handler)
proc run(pubsub: PubSub) {.
raises: [OSError, SslError, TimeoutError, AssertionError, OverflowError, ValueError, Exception, StompError, StompError, Exception, SslError, OSError, TimeoutError, AssertionError, OverflowError, ValueError], tags: [ReadIOEffect, WriteIOEffect, TimeEffect, RootEffect]
.}
Run forever, listening for messages from the STOMP server. When one is received, any handlers registered with subscribe() will be called on the message's payload.
proc publishModelEvent(pubsub: PubSub; model_name, event_name: string; obj: JsonNode) {.
raises: [ OSError, SslError, TimeoutError, AssertionError, OverflowError, ValueError, Exception, StompError, StompError, SslError, OSError, ValueError], tags: [ReadIOEffect, WriteIOEffect, TimeEffect, RootEffect]
.}
Publish an event to pubsub's model event exchange. It will be routed towards model_name.``event_name``, eg, user.saved. obj will be serialized into the message payload. Examples:
var
  pubsub = newPubSub("stomp://user:user@192.168.111.222/", "model_exchange")
  jsonObj = %*{"id": 1}
pubsub.publishModelEvent("user", "saved", jsonObj)