OCamlで非同期ちゃんねる
こんにちは、びしょ~じょです。 最近『並行プログラミング入門』を読みました。
並行計算のためのCPU命令やアルゴリズムの説明が順序立てておこなわれていて、わかりやすく面白かったです。 初版は結構エラッタが多かったので、みなさんは最新版や電子書籍などで買ってください。
個人的には、プログラムがアルゴリズム記述用の擬似言語だともう少しアルゴリズムを追いやすかったかなと思いました。
もちろんRustによる記述はそれがそのまま動かせるという利点はとても良いです。
あと~pi計算の説明でメタ変数がα
とセリフ・サンセリフのa
が極めて近いセンテンスで使われるなど大変な部分があったので、そこは気になった。
とにかく並行プログランミに熱が出たんで、別の熱源のOCaml5.0でぶち上げる。
Domainslibのチャンネル
OCaml5.0が22年12月にリリースされて早くも3ヶ月くらい経つ。 新たなバージョンでは並列並行計算プリミティブが組み込まれた話を1年前にした。
Domain
モジュールで並列プリミティブが使えるが、低レベルAPIで少々使いづらいので、ocaml-multicoreが提供するdomainslibを使うのが推奨されている1。
そんなdomainslibにはthread-safeなチャンネルがある。
utop # #require "domainslib";;
utop # module Chan = Domainslib.Chan;;
module Chan = Domainslib.Chan
utop # let c = Chan.make_bounded 2;; (* buffer size 2のチャンネル *)
val c : '_weak1 Chan.t = <abstr>
utop # Chan.recv_poll c;; (* `*_poll`でノンブロッキングにsend/recvする *)
- : '_weak2 option = None
utop # Chan.send_poll c 10;;
- : bool = true
utop # Chan.send_poll c 20;;
- : bool = true
utop # Chan.send_poll c 21;; (* buffer sizeの上限に達するのでsend失敗 *)
- : bool = false
utop # Chan.recv_poll c;; (* FILO *)
- : int option = Some 10
utop # Chan.recv_poll c;;
- : int option = Some 20
utop # Chan.recv_poll c;;
- : int option = None
Eio
ところで! 並行計算もできるようになったんで、そのwrapperであるeioも、あります。
networkingやclock、fileなどのIOを非同期に、さらにLwtやAsyncのようにmonadicではなくdirect-styleで書くことができる。 非同期IOバックエンドにはlibuvとio_uringを採用しており、プラットフォームに応じて選択できる。 『並行プログラミング入門』でいうところのプロセスはeffect handlersが切り取るcontinuationに相当し(おかげでdirect-styleに書ける)、API上は隠蔽されている。
utop # let main ~clock ~stdout =
Eio.Fiber.both
(fun () ->
for _ = 1 to 5 do
Eio.Flow.copy_string "hello\n" stdout;
Eio.Time.sleep clock 0.1
done)
(fun () ->
for _ = 1 to 5 do
Eio.Flow.copy_string "world\n" stdout;
Eio.Time.sleep clock 0.2
done)
;;
val main : clock:#Eio.Time.clock -> stdout:#Eio.Flow.sink -> unit = <fun>
utop # let run main =
Eio_main.run @@ fun env -> (* プラットフォームに応じた`env`オブジェクトが渡される *)
let t1 = Eio.Time.now env#clock in
let () = main ~clock:env#clock ~stdout:env#stdout in
let t2 = Eio.Time.now env#clock in
Eio.Flow.copy_string (Printf.sprintf "%f\n" (t2 -. t1)) env#stdout
;;
val run : (clock:Eio.Time.clock -> stdout:Eio.Flow.sink -> unit) -> unit =
<fun>
utop # run main;;
hello
world
hello
world
hello
hello
world
hello
world
world
1.009868
- : unit = ()
ええやん。
Eio.Time.sleep
が非同期-capableなので0.1*5秒と0.2*5秒別々にsleepしても合計実行時間は1.0秒になる。
非同期なチャンネルがほしいんですが
では、domainslibのチャンネルに非同期APIはあるかというと、ない。 とはいえできそうな雰囲気はあるんで、やりますか。
パッと思いつくイメージのままに、*_poll
してsend/recv失敗すればrerunすればいいだろう。
module C = Domainslib.Chan
let make () = C.make_unbound ()
let rec recv c =
match C.recv_poll c with
| Some v -> v
| None ->
Eio.Fiber.yield ();
recv ()
;;
let rec send c v =
if C.send_poll c v
then ()
else (
Eio.Fiber.yield ();
send ())
;;
(* 便利やつ *)
module Syntax = struct
let ( <~ ) c v = send c v
let ( ~> ) c = recv c
end
無難な気がする。
*_poll
で一回トライして、ダメだったら一旦yield
して他のプロセスに実行を移す。
プロセス再開時にリトライする。
たしかに非同期っぽい。
Go as Example
ええ感じな例として、Go by Exampleの例をやってみる。
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= numJobs; a++ {
<-results
}
}
え、何?
このブログでGoの話が出るのは4年ぶりなんで全くわかりません。
チャンネルに対するrange
文は、渡されたチャンネルから値をreceiveしてiterateするらしい。
なんつーかrange
っていうkeywordでやるんだ……て感じだが、keywordsを圧縮したい気持ちも分からなくもない。
<-
がsend/recvの演算子で、チャンネルがlhsにあるかrhsにあるかでsend/recvが決まるらしい。
go
keywordで関数をgoroutineに突っ込んで非同期に呼び出せる。
さらに、Go 1.14からgoroutineはpreemptiveになったので、yieldなどを手動で挿入する必要がない。
これは嬉しい。
これが1.14以降では、
関数の実行を監視しているモニター(sysmon)は、長時間実行されているgoroutineに対してpreemptフラグを立てる←これは1.13でも同じ
該当のgoroutineを処理しているスケジューラーにSIGURGというシグナルを送る
スケジューラーはSIGURGを受け取ったら、別のgoroutine(gsignal)を起動して実行しているgoroutineの代わりに割り付ける
gsignalはpreemptフラグが立っている場合に自分自身を停止する
へーですね。
先程のChan_async
にrange
を実装すると、
let[@warning "-nonreturning-statement"] rec recv_forever c f =
let open Syntax in
let v = ~>c in
ignore @@ f v;
recv_forever ()
;;
うん、普通だ。 syntacticに分岐がないので(実際にプロセス上はループを脱出できない)nonreturning statementで怒られるため、supressしている。
え~もうできたやん、使いましょう。
utop # let worker ~clock ~stdout id jobs results =
let open Chan_async.Syntax in
Chan_async.recv_forever jobs
@@ fun j ->
Eio.Flow.copy_string (Printf.sprintf "worker %d started job %d\n" id j) stdout;
Eio.Time.sleep clock 1.;
Eio.Flow.copy_string (Printf.sprintf "worker %d finished job %d\n" id j) stdout;
results <~ j * 2
;;
val worker :
clock:#Eio.Time.clock ->
stdout:#Eio.Flow.sink -> int -> int Chan_async.t -> int Chan_async.t -> unit =
<fun>
Chan_async.recv_forever jobs @@ fun j -> ...
をfor j := range jobs { ... }
と読み替えるともとの通りに読める。
utop # let main sw ~clock ~stdout =
let open Chan_async.Syntax in
let jobs = Chan_async.make () in
let results = Chan_async.make () in
let num_jobs = 5 in
let start = Eio.Time.now clock in
for w = 1 to 3 do
Eio.Fiber.fork ~sw
@@ fun () -> worker ~clock ~stdout w jobs results
done;
for j = 1 to num_jobs do
jobs <~ j
done;
for _ = 1 to num_jobs do
ignore @@ ~>results
done;
let end' = Eio.Time.now clock in
Eio.Flow.copy_string (Printf.sprintf "%f sec\n" (end' -. start)) stdout
;;
val main :
Eio.Switch.t -> clock:#Eio.Time.clock -> stdout:#Eio.Flow.sink -> unit =
<fun>
utop # let () =
Eio_main.run
@@ fun env -> Eio.Switch.run
@@ main ~clock:env#clock ~stdout:env#stdout
;;
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 2 finished job 2
worker 3 finished job 3
worker 1 started job 4
worker 2 started job 5
worker 1 finished job 4
worker 2 finished job 5
2.000017 sec
^CUncaught exception in run loop:
Exception: Stdlib.Sys.Break
No backtrace available.
Exception:
Failure
"Deadlock detected: no events scheduled but main function hasn't returned".
なんか止まらないんでCtrl+Cしたらデッドロックしたんですけど!
まあrecv_forever
が止まらないからっすね。
Goの例ではチャンネルをちゃんとclose
していたのでworkerが止まった2が、OCamlの実装だと止まらない。
close
する
ということで真面目にclose
に向き合う。
domainslibはチャンネルに対するclose操作を提供していない。
とりあえず、close
するとフラグを立てて、フラグ立ってたらsend/recv止まるようにしよう。
module Chan_async = struct
module C = Domainslib.Chan
type 'a t =
{ chan : 'a C.t
; closed : bool Atomic.t
}
let make () = { chan = C.make_unbounded (); closed = Atomic.make false }
exception Recv_closed
フラグとしてclosed
を持つ型'a t
を定義する。
thread-safeにしたいので、4.12から導入されたAtomic
コンテナを使う3。
天下り式に書くと、recvしたときすでにclosed
だと値を返しようがないので、適当な型の代わりに例外を発生させる。
let recv t =
match C.recv_poll t.chan with
| Some v -> v
| None ->
if Atomic.get t.closed then raise Recv_closed else Eio.Fiber.yield ();
recv t
;;
let rec send t v =
if C.send_poll t.chan v
then ()
else (
if Atomic.get t.closed then () else Eio.Fiber.yield ();
send t v)
;;
let close t = Atomic.set t.closed true
let rec recv_forever t f =
try
let v = recv t in
ignore @@ f v;
recv_forever t f
with
| Recv_closed -> ()
;;
end
他はまったく考えた通り。
recv_forever
でRecv_closed
例外を受ければループから抜けられる。
いいんじゃないでしょうか。
utop # let () =
Eio_main.run
@@ fun env -> Eio.Switch.run
@@ main ~clock:env#clock ~stdout:env#stdout
;;
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 2 finished job 2
worker 3 finished job 3
worker 1 started job 4
worker 2 started job 5
worker 1 finished job 4
worker 2 finished job 5
2.000396 sec
utop # ↵
よかったです。
ちなむと
Eioのexampleにあるworker poolのほうがeioっぽく書ける。
非同期なチャンネルはとりあえずEio.Stream
を使ってくださいとのことです。
Eio探訪記を書くつもりだったが、まあメジャーリリースされてからにするか。
-
"We recommend the users to utilise higher-level parallel programming libraries such as domainslib.", https://v2.ocaml.org/releases/5.0/manual/parallelism.html ↩
-
実はウソで、preemptiveなおかげか、
close
の行を消しても、チャンネルに対するrange
が受けているときに他のプロセスが終了して値がやってこないことが分かるとプロセス全体が終了する ↩