11. runとruns

処理フロー 」の節で説明したように、処理フローオブジェクトに登録された処理フローの実行には、 run もしくは runs メソッドが用いられる。 処理フローを構成する処理メソッドはthread上に展開され 1 2、メソッド間にパイプラインが敷設される。 そして与えられた入力データがパイプラインを流れ、各thread上の処理メソッドが流れ来るデータを処理していく。 このように、全てのthreadはMISD(Multiple Instruction Single Data)型の並列処理で実行される。 MISD型の並列処理の欠点は、処理フローの中で相対的に遅い処理メソッドがあれば、そこがボトルネックになるということだ。 ただし、より大規模な処理であれば、「 処理フロー 」の節で紹介した SIMD(Single Instruction Multiple Data)型の並列処理と組み合わせることで、 より効率的な処理を実現できる。

11.1. run: メンバーメソッド

run は、実行前に自動追加される処理メソッドを加えた上で処理フローを構成する。 その後、各処理メソッドはthread上に展開され実行されるが、 処理メソッドの数が多い場合は、それら全てが同時に実行されるわけではない。 メモリの圧迫や、ワークファイルの増加など、処理速度を低下させる要因が増え効率が上がらなくなるからである。 さらに1プロセスで利用できるファイルのオープン数制限もある(パイプも特殊なファイルの一種で、read/writeで2回のオープン数となる)。 そこで、同時実行thread数を経験的に制限している。 処理メソッドの平均的なメモリ利用量は47MB程度であり、実メモリ量を47MBで割った値を同時実行数として割り当てている。 他にどのようなプロセスが実行しているかは見ておらず、あくまでも物理的なメモリ要量から計算される。 例えば4GBメモリのマシンであれば、同時実行thread数の上限は86(=4084/47)となる。 この上限を変更することは可能で、2つの方法がある。 1つは、環境変数 KG_RUN_LIMIT にその値を設定することで、 もう一つは、 run の引数に runlimit を与えることである( リスト 11.1 )。 両方が同時に与えられた時は、 run の引数が優先される。

リスト 11.1 同時実行thread数の上限変更の2つの方法
1>>> import os
2>>> import nysol.mcmd as nm
3>>> os.environ['KG_RUN_LIMIT'] = '500' # 環境変数による設定
4>>> nm.mcut(f="a",i=dat).run(runlimit=500) # runの引数による設定

ある処理フローオブジェクトで実行される処理メソッドの数が上限を上回る場合には、 処理フローの「適当なところ」でフローを切断し(パイプ接続を切り)、 切断された箇所の結果をワークファイルへの出力に切り替える。 そして切断された先の処理は、前の全threadの処理が終わった後でワークファイルから読み込み、処理が継続されることになる。 極端な例として、runlimit=1 と設定すると、全てのメソッドは切断され、 全てのメソッドがシングルthreadで動作し、入力ファイルに近い処理メソッドから順番に起動され、 それらの処理メソッド間のデータのやりとりは、ワークファイルへの入出力で実現される。 結果としてファイルI/Oが多発して遅くなってしまう。

さて、処理フローを切断する「適当なところ」とはどこであろうか? mcmdの処理フローは循環のない有向グラフ(DAG:Directed Acyclic Graph)で表すことができる(詳細は「 処理フロー 」の節を参照)。 そこで概ね、次のようなヒューリスティックで切断位置を決めている。 入力データから始め、DAG上を有向辺に沿って幅優先で巡回していき、 訪れた節点に番号を振っていく。 その番号がthread数の上限を超えたところで、その番号以下の処理メソッドを1つのブロックとして切断する。 この操作を繰り返すことで、処理フロー全体を複数のブロックに分割し、順次threadの並列処理にて実行していっている。 実行中に起動されたthreadの数を確認したければ top コマンドを用いればよい。

以上のような切断方法以外にも、より処理効率を高めるための切断点の検出方法は存在するであろう。 ただ、各メソッドのアルゴリズムの処理効率、ワークファイル利用の有無、フロー分岐後の切断へのペナルティなど、 考慮すべき要因が多く、それらの要因を考慮した最適切断点の検出は今後の課題である。

threadのスタックサイズ

処理メソッドはthread上で実行されるが、threadにはスタックサイズに上限があり、 この制約のためにメモリエラーが生じることがある。 処理メソッドはC++で実装されており、C++ではローカル関数のauto変数は全てスタック上に確保される。 スタックの方が、スタティック領域やヒープ領域より速度が早いのが理由である。 処理メソッド内部のデータバッファやソーティングバッファなど大きなメモリを必要とするものはヒープを用いているが、 例えば、 mselstrv= で指定する文字列リストなどはスタック領域を使っている。 もし v=a1,a2,a3,...,a1000000 のような指定をすると 3 、途端にthreadのスタック領域を使い切ってしまう。

threadのスタックサイズの上限は1048576( \(2^{20}\) )バイトである。 この値は、環境変数 KG_THREAD_STK を設定することで変更可能である( リスト 11.2 )。 ただし、16384以上の16の倍数にしなければエラーとなる 4

リスト 11.2 theadのスタックサイズの変更
1>>> import os
2>>> os.environ['KG_THREAD_STK'] = '2097152'

11.2. runs: クラスメソッド

前節の run メソッドは、最終出力が1つの時に呼び出す処理フローオブジェクトのメンバーメソッドであるのに対し、 複数の出力を持つような処理フローの実行には runs が用いられる(詳細は、「 処理フロー 」を参照)。 より正確には、シンク節点を複数持つDAGである時に runs を用いる。 run で実行できるフローを runs で実行することはなんの問題もない。 しかし逆に、出力を複数もちフローを複数の run メソッドで実行すると、処理結果は同じでも処理効率に違いが出てくる。 runs も基本的には run とやっていることはほぼ同じであるが、 runs では、指定された複数の処理フローオブジェクトを統合して、DAGを再構成する点に違いがあるからである。 よって、複数の run を実行すると、統合した場合には共通化できるフローが別々に重複して実行されてしまい、効率が落ちるのである。

11.3. 返り値

run の返り値は、最終出力で o=ファイル名 を指定していればそのファイル名を返す。 o= を省略していれば、もしくは o=リスト を指定している時は、結果をリストで返す。 runs では最終出力が複数になるので、 run と同様のルールで、結果をリストで返す。 リスト 11.3 に例を示す。

リスト 11.3 o=の指定と返り値の例
 1dat1=[
 2["key","val"],
 3["a",1],
 4["a",2],
 5["b",3],
 6["b",4],
 7]
 8
 9# o=にファイル名を指定すると、ファイル名を返す。
10ret=nm.mcut(f="key,val",i=dat1,o="out1.csv").run()
11print(ret)
12# out1.csv
13
14# o=にリストを指定すると、結果データをリストで返す。
15out1=[]
16ret=nm.mcut(f="key,val",i=dat1,o=out1).run()
17print(ret)
18# [['a', '1'], ['a', '2'], ['b', '3'], ['b', '4']]
19
20# o=を省略すると結果をPythonリストで返す。
21ret=nm.mcut(f="key,val",i=dat1).run()
22print(ret)
23# [['a', '1'], ['a', '2'], ['b', '3'], ['b', '4']]
24
25# runsで実行すると、出力ファイル名のリストを返す。
26fa=None
27fb=None
28fa <<= nm.mselstr(f="key",v="a",i=dat1)
29fb <<= fa.redirect("u")
30fa <<= nm.msum(k="key",f="val",o="out1.csv")
31fb <<= nm.msum(k="key",f="val",o="out2.csv")
32ret=nm.runs([fa,fb])
33print(ret)
34# ['out1.csv', 'out2.csv']
35
36# runsでリスト出力とファイル出力を混在させた場合
37out1=[]
38fa=None
39fb=None
40fa <<= nm.mselstr(f="key",v="a",i=dat1)
41fb <<= fa.redirect("u")
42fa <<= nm.msum(k="key",f="val",o=out1)
43fb <<= nm.msum(k="key",f="val",o="out2.csv")
44ret=nm.runs([fa,fb])
45print(ret)
46# [[['a', '3']], 'out2.csv']

11.4. メッセージ制御

runruns に共通したパラメータとしてメッセージ出力の有無を制御する msg= を指定できる。 msg="on" とすれば処理フロー上の各メソッドが終了した時に終了メッセージが表示される。

リスト 11.4 mcmdのインポートと入力データの設定
 1import nysol.mcmd as nm
 2dat=[
 3["customer","date","amount"],
 4["A","20180101",5200],
 5["B","20180101",800],
 6["B","20180112",3500],
 7["A","20180105",2000],
 8["B","20180107",4000]
 9]
10nm.mcut(f="customer,amount",i=dat).run(msg="on")
11#END# kgload -nfn; IN=0 OUT=6; 2018/09/10 08:56:55; 2018/09/10 08:56:55
12#END# kgcut f=customer,amount; IN=5 OUT=5; 2018/09/10 08:56:55; 2018/09/10 08:56:55
13#END# kgload; IN=0 OUT=0; 2018/09/10 08:56:55; 2018/09/10 08:56:55
14nm.mcut(f="customer,amount",i=dat).run(msg="off")
15# "on"以外の文字列を与えるか、省略すると終了メッセージは表示されない。

さらに、環境変数 KG_VerboseLevel を設定することでメッセージをより細かく制御することができる。 以下に、設定値とその内容を、設定例を リスト 11.5 に示す。 msg="on" を指定しなかった場合は、KG_VerboseLevel=2 の設定に従ってメッセージが表示される。 すなわち、errorとwarningメッセージのみ表示する。 この値は変更できない。 変更できるのは msg="on" を指定したときの振る舞いで、 環境変数 KG_VerboseLevel の設定に従ったメッセージが表示される。

内容

0

メッセージを一切出力しない

1

+ error メッセージ出力

2

+ warning メッセージ出力(msg="on"以外の場合)

3

+ end メッセージ出力

4

+ msg メッセージ出力 (msg="on"のデフォルト)

リスト 11.5 メッセージの表示レベルの変更例
1import os
2os.environ['KG_VerboseLevel'] = '0' # run(msg="on")とした時はメッセージを一切表示しなくなる。

Footnotes

1

内部的には POSIX thread を使っている。

2

cmdrunfunc ( 特殊な処理メソッド )はthread上でforkしてプロセスとして起動される。

3

このような処理は、条件となる複数の文字列をPythonリストもしくはCSVに格納し mcommon を用いればよい。

4

この制約に違反した時、OSによっては自動調整されるが、macだと stack size change error で停止してしまう。