一 月
13
日曜日

send-offでディスパッチしたアクションが実行されないケースについて

Clojureagents機構にてsend-offでディスパッチしたアクションが呼ばれないケースに遭遇した際の調査メモ。


事象

(send-off agent (fn [_] (prn "fire!"))
(ca/<!! (ca/timeout 10000))

通常、send-offでディスパッチしたアクションは非同期かつほぼ即座に実行されるが、send-offの呼び出し以降でブロッキング処理があるとブロック中はアクションが実行されないケースに遭遇した。
上記の例ではタイムアウトチャネルで10秒間ブロックしているがこれをどれだけ長くしても結果は変わらない。
またこのような書き方をしてる全てのコードで発生するわけでなく、必ず再現するコードと全く再現しないコードに二分している。

ということで最小の再現コードを見つけ出し原因を調べてみることにした。

再現コード

最小の再現コードは以下の通り。

(require '[clojure.core.async :as ca])

(def a (agent nil))
(def b (agent nil))
(def ch (ca/chan 1))

(send-off a (fn [_] ;;関数A
                (send-off b (fn [_] ;;関数B
                              (prn "fire!")))
                (ca/<!! (ca/timeout 10000)))

再現のポイント

ポイントはネストしたsend-offの呼び出しだった。
最小化した再現コードでは露骨に send-off が入れ子になり狂気を感じてしまうが一旦そこは無視してほしい。
重要なのは見た目がネストしていることではなく、二回目のsend-offとブロッキング処理がAgentスレッドプールのスレッド上で行われる点にある。

コードの挙動を確認する

上記コードは以下のように動くものだと想定していた。

  1. agent a 対する send-off で関数Aが登録。
  2. 関数Aが非同期に Agentスレッド1 で実行される。
  3. Agentスレッド1 上にてagent b に対する send-off で関数Bが登録。
  4. Agentスレッド1 上にて関数Aはタイムアウトチャネルがタイムアウトするまで待機する。
  5. 登録した関数Bが非同期にagentスレッド2 で実行される。

が、実際の動きは以下となる(1〜4までは同じ、5が異なる)

  1. agent a 対する send-off で関数Aが登録。
  2. 関数Aが非同期に Agentスレッド1 で実行される。
  3. Agentスレッド1 上にてagent b に対する send-off で関数Bが登録。
  4. Agentスレッド1 上にて関数Aはタイムアウトチャネルがタイムアウトするまで待機する。
  5. 登録した関数Bは実行されない (タイムアウト後に実行される)

理由

Clojure本体のAgent周りのコードを読むとこの挙動の理由がわかる。

アクションのディスパッチ

https://github.com/clojure/clojure/blob/28b87d53909774af28f9f9ba6dfa2d4b94194a57/src/jvm/clojure/lang/Agent.java#L246-L256

static void dispatchAction(Action action) {
  LockingTransaction trans = LockingTransaction.getRunning();
  if (trans != null) {
    trans.enqueue(action);
  } else if(nested.get() != null) {
    nested.set(nested.get().cons(action));
  } else {
    action.agent.enqueue(action);
  }
}

clojure.lang.Agent.dispatchActionsend-off関数内から利用されアクションをディスパッチするためのメソッドだが、実行時のコンテクストに応じてディスパッチ方法が異なっている。


LockingTransactionが存在している場合

通常、dosyncを使用したトランザクション処理はSTMによってリトライされる可能性があるため副作用を伴う処理は厳禁だが、dosync内でagentを利用することでトランザクションが確定した際に一度だけアクションが実行されることが保証される。これによりdoysnc + agentで副作用を伴う処理を記述することができる。
LockingTrasactionが存在する場合はこの分岐に入るが今回の事象とは関係ない。

スレッドローカル変数 nested が存在している場合

このスレッドローカル変数はアクションがagentスレッド上で実行されている間のみ非null(リストが設定)となり、同スレッド上からさらにアクションがディスパッチされた場合はリストにネストしたアクションが追加される。
今回の再現ケースでは関数Bのディスパッチがこの分岐で行われることになるが、ここでのネストしたアクションの扱いが今回の事象と関係している。
このリストに追加されたアクションの実行タイミングを調べることで今回の挙動の理由がわかる。

それ以外の場合

トランザクションもネストもない素の状態でアクションをディスパッチした場合で今回の再現ケースでは関数Aのディスパッチはこの分岐で行われる。

ネストしたアクションの実行タイミング

ネストしたアクションはスレッドローカル変数 nested にリストとして積まれることが分かったので、次はこれらがどのタイミングで実行されるかを確認する。

https://github.com/clojure/clojure/blob/28b87d53909774af28f9f9ba6dfa2d4b94194a57/src/jvm/clojure/lang/Agent.java#L275-L286

static public int releasePendingSends() {
  IPersistentVector sends = nested.get();
  if(sends == null) {
    return 0;
  }
  for(int i=0;i<sends.count();i++) {
    Action a = (Action) sends.valAt(i);
    a.agent.enqueue(a);
  }
  nested.set(PersistentVector.EMPTY);
  return sends.count();
}

clojure.lang.Agent.releasePendingSendsnested リストに積まれたネストしたアクションを実行する。
このメソッドはclojure.lang.Agent.Action.doRunからアクション実行時の後処理として呼び出される。

https://github.com/clojure/clojure/blob/28b87d53909774af28f9f9ba6dfa2d4b94194a57/src/jvm/clojure/lang/Agent.java#L105-L126

static void doRun(Action action){
  try {
    nested.set(PersistentVector.EMPTY);

    Throwable error = null;
    try {
      Object oldval = action.agent.state;
      Object newval =  action.fn.applyTo(RT.cons(action.agent.state, action.args));
      action.agent.setState(newval);
      action.agent.notifyWatches(oldval,newval);
    } catch(Throwable e) {
      error = e;
    }

    if(error == null) {
      releasePendingSends();
    }
    ・
    ・
    ・

この事から ネストした全てのアクションは大元(一番外側)のアクション実行完了後に一括で実行される ことがわかる。

まとめ

send-offの呼び出し以降でブロッキング処理があるとブロック中はアクションが実行されないケースに遭遇した。

ネストしたアクションのディスパッチを行っていた。
大元のアクション内にブロッキング処理がありアクションが完了しないため後処理が動かずネストしたアクションの実行も行われない状態になっていた。

全てのコードで発生するわけでなく、必ず再現するコードと全く再現しないコードに二分している。

再現するコードは呼び出し元で既にアクションを実行し実行スレッドがAgentスレッドになってた。
このため以降のディスパッチしたアクションは全てネストしたアクションとなり大元のアクションが完了するまで実行されないようになっていた。

再現しないコードは呼び出し元がAgentスレッドではないため、アクションは即座に実行されていた。