四 月
10
火曜日

core.asyncベースのEvent Busを書いた

実験的にcore.asyncをベースとしたEvent Bus nijohando/event を書いた。
今回試したかったのは以下の3点。

  • core.async チャネルベースのインタフェース
  • REST API風なパスベースのイベント処理
  • Request-Replyパターンのサポート


core.asyncチャネルベースのインタフェース

core.asyncが提供する非同期プログラミングの枠組みに組み込むことで、I/O、バッファリング、タイムアウト、Transducersといったcore.asyncのファシリティを Clojure / ClojureScriptの両環境で利用できる利点がある。

イベントの発行

イベントの発行はEmitterチャネルへの書き込みで行う。
Emitterチャネルは任意のcore.asyncチャネルを emitize 関数でEventBusに紐付けることで作成する。

(require '[jp.nijohando.event :as ev]
         '[clojure.core.async :as ca])

(def bus (ev/bus))
(def emitter (ca/chan))
(ev/emitize bus emitter)
(ca/go
  (ca/>! emitter {:path "/messages/post"
                  :value {:name "taro"
                          :text "hello!"}}))

イベントの購読

同様にイベントの購読はListenerチャネルからの読み込みで行う。
Listenerチャネルは任意のcore.asyncチャネルを listen 関数でEventBusに紐付けることで作成する。
紐付ける際は購読するイベントのパスを指定する。

(def bus (ev/bus))
(def listener (ca/chan))
(ev/listen bus "/messages/post" listener)
(ca/go-loop []
  (when-some [{:keys [path value] :as event} (ca/<! listener)]
    (println "from:" (:name value) "msg:" (:text value)))
    (recur))

REST API風なパスベースのイベント処理

REST APIがリソースをパスで表すようにイベントをパスで表す。 RESTと違いパスはイベントを表す名詞に加えて操作や状態を表す動詞や形容詞を含む場合がある点。

;; 新規メッセージ作成要求イベント
{:path "/messages/post"
 :value {:name "taro"
         :value "hello!"}}

;; メッセージID10のリソースの削除要求イベント
{:path "/messages/10/delete"}

パスで表すことによりマッチ処理やパスパラメータの取得などにRouterライブラリを活用することができ、core.asyncの pub/sub と比べて柔軟なイベント購読ができるようになる利点がある。

;; 任意の(IDを持つ)メッセージの削除要求イベントを購読
(ev/listen bus "/messages/:id/delete" listener)
;; 任意の(IDを持つ)メッセージに対する全てのイベントを購読
(ev/listen bus "/messages/:id/*" listener)
;; メッセージの新規作成、削除要求イベントを購読
(ev/listen bus ["/messages" 
                 ["/post"]
                 ["/:id/delete"]] listener)

本ライブラリではRouterとして reitit-core を利用しているため、reititのルートシンタックスを利用することができる。

Requet-Replyパターン

基本的にイベントの流れはEmitterからListenerへ一方通行だがイベント処理をAPI/RPCのように関数的に利用したいケースもある。この場合Emitterは関数の呼び出し元、イベントは引数、Listenerが関数本体となるが、Listenerの処理結果を戻り値として呼び出し元に返す必要がある。

今回はEmitterチャネル毎にユニークなemitter-idを付与し、 イベントパス /emitters/:id/reply 宛てに応答を返すという設計にした。

Emitter側は emitize 関数で以下のように応答受信用チャネルを指定することができる。

(def emitter (ca/chan))
(def reply-ch (ca/chan))
;; Emitterチャネルと併せて応答受信用チャネルを指定する
(ev/emitize bus emitter reply-ch)

この応答受信チャネルは /emitters/<自身のemitter-id>/reply のListenerチャネルになっておりイベント発行後の応答を受信できる。

;; イベントを発行し処理結果を取得する
(ca/go
  (let [emitter (ca/chan)
        reply-ch (ca/chan)]
    (ev/emitize bus emitter reply-ch)
    (ca/>! emitter (ev/event "/messages/post"))
    (when-some [{:keys [value]} (ca/<! reply-ch)]
      (prn "message" (:msg-id value) "created!"))))

Listener側は受信したイベントから reply-to 関数で応答イベントを作成。それを発行することで応答を返す。

;; イベントを処理し結果を返す
(ca/go
  (let [emitter (ca/chan)  ;; 応答送信用のEmitter
        listener (ca/chan)
        msg-id (atom 0)]
    (ev/emitize bus emitter)
    (ev/listen bus "/messages/post" listener)
    (ca/go-loop []
      (when-some [event (ca/<! listener)]
        (let [reply (ev/reply-to event {:status :created
                                        :msg-id (swap! msg-id inc)})]
          (ca/>! emitter reply)
          (recur))))))

Emitterチャネルのスコープと応答

応答イベントを返す先はあくまでEmitter単位となるため、Emitterチャネルをどのスコープで維持するかによって応答の意味合いが変わってくる。

例えばイベント発行毎に新規にEmitterチャネルを作成する場合、emitter-id とイベントは1対1の関係となるためその応答もイベントに対する応答を意味する。 一方クライアント毎にEmitterチャネルを作成し利用し続ける場合、emitter-id とイベントは1対nの関係となるため応答が何に対してなのかはクライアント間の文脈次第になる。

最後に

とりあえず機能的にはこれだけあれば生きていけそうな気がするので一旦これで趣味プロジェクトに適用し塩梅を確認してく予定。