こんにちは、びしょ~じょです。 最近『並行プログラミング入門』を読みました。

Amazonで高野 祐輝の並行プログラミング入門 ―Rust、C、アセンブリによる実装からのアプローチ。アマゾンならポイント還元本が多数。高野 祐輝作品ほか、お急ぎ便対象商品は当日お届けも可能。また並行プログラミング入門 ―Rust、C、アセンブリによる実装からのアプローチもアマゾン配送商品なら通常配送無料。

並行計算のためのCPU命令やアルゴリズムの説明が順序立てておこなわれていて、わかりやすく面白かったです。 初版は結構エラッタが多かったので、みなさんは最新版や電子書籍などで買ってください。

個人的には、プログラムがアルゴリズム記述用の擬似言語だともう少しアルゴリズムを追いやすかったかなと思いました。 もちろんRustによる記述はそれがそのまま動かせるという利点はとても良いです。 あと~pi計算の説明でメタ変数がαとセリフ・サンセリフのaが極めて近いセンテンスで使われるなど大変な部分があったので、そこは気になった。

とにかく並行プログランミに熱が出たんで、別の熱源のOCaml5.0でぶち上げる。

Domainslibのチャンネル

OCaml5.0が22年12月にリリースされて早くも3ヶ月くらい経つ。 新たなバージョンでは並列並行計算プリミティブが組み込まれた話を1年前にした。

こんにちは、びしょ〜じょです。これはMeta Languages Advent Calendar 2021の3日目の記事です。今日は12月3日、冴草きいちゃんの誕生日です。いいね? めでたいです...

Domainモジュールで並列プリミティブが使えるが、低レベルAPIで少々使いづらいので、ocaml-multicoreが提供するdomainslibを使うのが推奨されている1

Parallel Programming over Domains. Contribute to ocaml-multicore/domainslib development by creating an account on GitHub.

そんなdomainslibにはthread-safeなチャンネルがある。

channels example
utop # #require "domainslib";;
utop # module Chan  = Domainslib.Chan;;
module Chan = Domainslib.Chan
utop # let c = Chan.make_bounded 2;; (* buffer size 2のチャンネル *)
val c : '_weak1 Chan.t = <abstr>
utop # Chan.recv_poll c;; (* `*_poll`でノンブロッキングにsend/recvする *)
- : '_weak2 option = None
utop # Chan.send_poll c 10;;
- : bool = true
utop # Chan.send_poll c 20;;
- : bool = true
utop # Chan.send_poll c 21;; (* buffer sizeの上限に達するのでsend失敗 *)
- : bool = false
utop # Chan.recv_poll c;; (* FILO *)
- : int option = Some 10
utop # Chan.recv_poll c;;
- : int option = Some 20
utop # Chan.recv_poll c;;
- : int option = None

Eio

ところで! 並行計算もできるようになったんで、そのwrapperであるeioも、あります。

Effects-based direct-style IO for multicore OCaml. Contribute to ocaml-multicore/eio development by creating an account on GitHub.

networkingやclock、fileなどのIOを非同期に、さらにLwtAsyncのようにmonadicではなくdirect-styleで書くことができる。 非同期IOバックエンドにはlibuvとio_uringを採用しており、プラットフォームに応じて選択できる。 『並行プログラミング入門』でいうところのプロセスはeffect handlersが切り取るcontinuationに相当し(おかげでdirect-styleに書ける)、API上は隠蔽されている。

async example
utop # let main ~clock ~stdout =
         Eio.Fiber.both
           (fun () ->
             for _ = 1 to 5 do
               Eio.Flow.copy_string "hello\n" stdout;
               Eio.Time.sleep clock 0.1
             done)
           (fun () ->
             for _ = 1 to 5 do
               Eio.Flow.copy_string "world\n" stdout;
               Eio.Time.sleep clock 0.2
             done)
       ;;
val main : clock:#Eio.Time.clock -> stdout:#Eio.Flow.sink -> unit = <fun>
utop # let run main =
         Eio_main.run @@ fun env -> (* プラットフォームに応じた`env`オブジェクトが渡される *)
         let t1 = Eio.Time.now env#clock in
         let () = main ~clock:env#clock ~stdout:env#stdout in
         let t2 = Eio.Time.now env#clock in
         Eio.Flow.copy_string (Printf.sprintf "%f\n" (t2 -. t1)) env#stdout
       ;;
val run : (clock:Eio.Time.clock -> stdout:Eio.Flow.sink -> unit) -> unit =
  <fun>
utop # run main;;
hello
world
hello
world
hello
hello
world
hello
world
world
1.009868
- : unit = ()

ええやん。 Eio.Time.sleepが非同期-capableなので0.1*5秒と0.2*5秒別々にsleepしても合計実行時間は1.0秒になる。

非同期なチャンネルがほしいんですが

では、domainslibのチャンネルに非同期APIはあるかというと、ない。 とはいえできそうな雰囲気はあるんで、やりますか。

パッと思いつくイメージのままに、*_pollしてsend/recv失敗すればrerunすればいいだろう。

chan_async.ml (1)
module C = Domainslib.Chan

let make () = C.make_unbound ()

let rec recv c =
  match C.recv_poll c with
  | Some v -> v
  | None ->
    Eio.Fiber.yield ();
    recv ()
;;

let rec send c v =
  if C.send_poll c v
  then ()
  else (
    Eio.Fiber.yield ();
    send ())
;;

(* 便利やつ *)
module Syntax = struct
  let ( <~ ) c v = send c v
  let ( ~> ) c = recv c
end

無難な気がする。 *_pollで一回トライして、ダメだったら一旦yieldして他のプロセスに実行を移す。 プロセス再開時にリトライする。 たしかに非同期っぽい。

Go as Example

ええ感じな例として、Go by Exampleの例をやってみる。

Go by Example: Worker Poolsより引用
package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {

    const numJobs = 5
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    for a := 1; a <= numJobs; a++ {
        <-results
    }
}

え、何? このブログでGoの話が出るのは4年ぶりなんで全くわかりません。 チャンネルに対するrange文は、渡されたチャンネルから値をreceiveしてiterateするらしい。 なんつーかrangeっていうkeywordでやるんだ……て感じだが、keywordsを圧縮したい気持ちも分からなくもない。 <-がsend/recvの演算子で、チャンネルがlhsにあるかrhsにあるかでsend/recvが決まるらしい。

go keywordで関数をgoroutineに突っ込んで非同期に呼び出せる。 さらに、Go 1.14からgoroutineはpreemptiveになったので、yieldなどを手動で挿入する必要がない。 これは嬉しい。

[欅樹雑記: Go 1.14からgoroutineがプリエンプティブになったらしい](https://blog.zelkova.cc/2021/08/go1.14-asynchronous%20preemption.html)

これが1.14以降では、

  1. 関数の実行を監視しているモニター(sysmon)は、長時間実行されているgoroutineに対してpreemptフラグを立てる←これは1.13でも同じ

  2. 該当のgoroutineを処理しているスケジューラーにSIGURGというシグナルを送る

  3. スケジューラーはSIGURGを受け取ったら、別のgoroutine(gsignal)を起動して実行しているgoroutineの代わりに割り付ける

  4. gsignalはpreemptフラグが立っている場合に自分自身を停止する

へーですね。 先程のChan_asyncrangeを実装すると、

chan_async.ml(cont'd)
let[@warning "-nonreturning-statement"] rec recv_forever c f =
  let open Syntax in
  let v = ~>c in
  ignore @@ f v;
  recv_forever ()
;;

うん、普通だ。 syntacticに分岐がないので(実際にプロセス上はループを脱出できない)nonreturning statementで怒られるため、supressしている。

え~もうできたやん、使いましょう。

utop # let worker ~clock ~stdout id jobs results =
  let open Chan_async.Syntax in
  Chan_async.recv_forever jobs
  @@ fun j ->
  Eio.Flow.copy_string (Printf.sprintf "worker %d started job %d\n" id j) stdout;
  Eio.Time.sleep clock 1.;
  Eio.Flow.copy_string (Printf.sprintf "worker %d finished job %d\n" id j) stdout;
  results <~ j * 2
;;
val worker :
  clock:#Eio.Time.clock ->
  stdout:#Eio.Flow.sink -> int -> int Chan_async.t -> int Chan_async.t -> unit =
  <fun>

Chan_async.recv_forever jobs @@ fun j -> ...for j := range jobs { ... }と読み替えるともとの通りに読める。

utop # let main sw ~clock ~stdout =
  let open Chan_async.Syntax in
  let jobs = Chan_async.make () in
  let results = Chan_async.make () in
  let num_jobs = 5 in
  let start = Eio.Time.now clock in
  for w = 1 to 3 do
    Eio.Fiber.fork ~sw
    @@ fun () -> worker ~clock ~stdout w jobs results
  done;
  for j = 1 to num_jobs do
    jobs <~ j
  done;
  for _ = 1 to num_jobs do
    ignore @@ ~>results
  done;
  let end' = Eio.Time.now clock in
  Eio.Flow.copy_string (Printf.sprintf "%f sec\n" (end' -. start)) stdout
;;
val main :
  Eio.Switch.t -> clock:#Eio.Time.clock -> stdout:#Eio.Flow.sink -> unit =
  <fun>
utop # let () =
         Eio_main.run
         @@ fun env -> Eio.Switch.run
         @@ main ~clock:env#clock ~stdout:env#stdout
;;
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 2 finished job 2
worker 3 finished job 3
worker 1 started job 4
worker 2 started job 5
worker 1 finished job 4
worker 2 finished job 5
2.000017 sec
^CUncaught exception in run loop:
Exception: Stdlib.Sys.Break
No backtrace available.
Exception:
Failure
 "Deadlock detected: no events scheduled but main function hasn't returned".

なんか止まらないんでCtrl+Cしたらデッドロックしたんですけど! まあrecv_foreverが止まらないからっすね。 Goの例ではチャンネルをちゃんとcloseしていたのでworkerが止まった2が、OCamlの実装だと止まらない。

closeする

ということで真面目にcloseに向き合う。 domainslibはチャンネルに対するclose操作を提供していない。 とりあえず、closeするとフラグを立てて、フラグ立ってたらsend/recv止まるようにしよう。

chan_async.ml(2)
module Chan_async = struct
  module C = Domainslib.Chan

  type 'a t =
    { chan : 'a C.t
    ; closed : bool Atomic.t
    }

  let make () = { chan = C.make_unbounded (); closed = Atomic.make false }

  exception Recv_closed

フラグとしてclosedを持つ型'a tを定義する。 thread-safeにしたいので、4.12から導入されたAtomicコンテナを使う3

天下り式に書くと、recvしたときすでにclosedだと値を返しようがないので、適当な型の代わりに例外を発生させる。

chan_async.ml(2,cont'd)
  let recv t =
    match C.recv_poll t.chan with
    | Some v -> v
    | None ->
      if Atomic.get t.closed then raise Recv_closed else Eio.Fiber.yield ();
      recv t
  ;;

  let rec send t v =
    if C.send_poll t.chan v
    then ()
    else (
      if Atomic.get t.closed then () else Eio.Fiber.yield ();
      send t v)
  ;;

  let close t = Atomic.set t.closed true

  let rec recv_forever t f =
    try
      let v = recv t in
      ignore @@ f v;
      recv_forever t f
    with
    | Recv_closed -> ()
  ;;
end

他はまったく考えた通り。 recv_foreverRecv_closed例外を受ければループから抜けられる。

いいんじゃないでしょうか。

utop # let () =
         Eio_main.run
         @@ fun env -> Eio.Switch.run
         @@ main ~clock:env#clock ~stdout:env#stdout
;;
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 2 finished job 2
worker 3 finished job 3
worker 1 started job 4
worker 2 started job 5
worker 1 finished job 4
worker 2 finished job 5
2.000396 sec
utop # 

よかったです。

ちなむと

Eioのexampleにあるworker poolのほうがeioっぽく書ける。 非同期なチャンネルはとりあえずEio.Streamを使ってくださいとのことです。

Eio探訪記を書くつもりだったが、まあメジャーリリースされてからにするか。


  1. "We recommend the users to utilise higher-level parallel programming libraries such as domainslib.", https://v2.ocaml.org/releases/5.0/manual/parallelism.html 

  2. 実はウソで、preemptiveなおかげか、closeの行を消しても、チャンネルに対するrangeが受けているときに他のプロセスが終了して値がやってこないことが分かるとプロセス全体が終了する 

  3. 実装を見ると確かにthread-safeな命令を使っている