最近は専らPythonがマイブーム、シンプルで書きやすく、色々な環境で動かせて、パワー不足ならCythonを使ったり他言語でモジュール化して組み合わせたりと簡単に出来ます。 最近ではPCパワーがあるので余程ヘビーなことじゃなければPythonオンリーでこなせるし、ちょっとした事なら並列化対応も出来ます。 で、今回は並列実装の話を書いておきます。
Pythonの並列には大きく二種類あります、threadingとmultiprocessing。
前者はスレッドベースの並列で単一のプロセスの中でスレッドが立ち上がり、これによってグローバルなメモリなどを共有します。 これのポイントは、メモリなどを共有するのでスレッド間で簡単にデータを共有できますが、仕組みにより処理が完全に同時には行えません(短いスパンで切替を繰り返している)、待機時間等で処理が空いているときに他のスレッドが処理を行えるのでSelenium等で外部ブラウザの処理待ちなどが多発するケースで効率的に動けます。 後者はプロセスベースの並列で、複数のプロセスが立ち上がります、結果、各プロセスは独自のメモリ空間を持つので同じデータでもプロセス分だけ複製が発生してリソースを食いますが独立して同時処理が出来ます(プロセス間のやりとりにはプロセス間通信のPIPEやQueueを使います)
今回はthreading
基本的には、ある関数をスレッドとして起こします。
1 2 3 4 5 6 7 |
import threading def thread_one(): #some code my_thread = threading.Thread(target=thread_one) my_thread.start() |
threading.Threadでtargetにスレッドとして起こす関数を指定してインスタンスを作りstartします。
スレッドを走らせると同時に色々な処理が発生したりするので、状態を管理する必要があります。
状態管理のために用意されている機能として以下のような物があります。
Lock
単純なロックです。
.acquire(blocking=True, timeout=-1)
ロックを取得します、blockingがTrueの時にはロック済みの時ブロックされます(つまり他がロック済みの時に、そちらが開放されてロックが取得できるまで待ちます。 この待ち時間上限がtimeoutで-1の時には無期限に待ちます) .release()によってロックを開放します。
例えばsqliteを使って処理するときに同時アクセスを防ぐためにロックを使います。
1 2 3 4 5 6 7 8 9 10 |
sql_lock = threading.Lock() def my_thread(): sql_lock.acquire() sql.execute('INSERT INTO ...') sql_lock.release() for n in range(10): thread = threading.Thread(target=my_thread) thread.start() |
これによって、他のスレッドが共有リソースsqlを実行中に同時実行することを防げます。 また、acquire/release型の同期処理はwithを使う事が出来ます。
1 2 3 |
def my_thread(): with sql_lock: sql.execute('INSERT INTO ...') |
releaseは他のスレッドから開放することも出来る事に注意が必要です(ロックの取得が失敗したときにロックを開放するコードが走ったりすると、他のスレッドが取得したロックが開放されてしまいます)
ロック済みか確認するための.locked()が存在します。
RLock
再入可能ロックです。 ロックを複数回取得することが出来ますが、その回数だけ開放しないと完全に開放されません。 また、Lockと異なり取得したスレッドだけが開放できます。 ループ構造で複数回ロックが必要な箇所が発生する時などにこちらを使います。
Condition
状態間に使える待機機能です。
wait(timeout=None)で待機状態になり、notify(n=1) or notify_all()が実行されるかtimeoutするまで待機します。 timeout=Noneは無期限で、小数点型秒数を指定できます。 例えばログイン処理が完了したら複数のスレッドで一斉に処理を行いたいとか言ったときに、各スレッドはwait状態でログイン完了を待ち、ログイン処理が完了したらnotify_all()を呼び出してスレッドに処理を開始させます。
1 2 3 4 5 6 7 8 9 10 11 |
cond = threading.Condition() def my_thread(): cond.wait() # same codes cond.acquire() for n in range(10): thread = threading.Thread(target=my_thread) thread.start() cond.notify_all() |
インスタンスはCondition(lock=None)によって作ります。 引数lockとして既存のLockかRLockを与えることが出来、これによって挙動が変わります。 デフォルトではLockを内部で作成します。
nofity(n=1)の場合は、wait中のうちのn番目の物の処理を再開します(但しnの扱いは将来的に仕様変更の可能性がある)
Semaphore
セマフォです。 RLockに似て複数回の取得と開放が出来ます。
threading.Semaphore(n=1)でセマフォを作成して、.acquire(blocking=True, timeout=None)でセマフォを取得します。 セマフォは取得する度に1ずつ小さくなり、セマフォが既に0の時にブロックします。 .release(n=1)は逆にセマフォがnずつ増加します。 つまり、有限な個数のリソースが使えるときに、その数のセマフォを作成して、各スレッドがacquireして行くとリソースが不足するときにacquireが待機し、リソースを使い終わったスレッドがreleaseする(セマフォの数が1以上になる)事で処理が再開されます。 同時アクセス制限がある機能を呼び出したりする場合に有用です。
Event
Conditionに似たイベントを管理する機能です。
.Event()でインスタンスを作り、.set()でTrue、.clear()でFalseへ状態を変更します。
.wait(timeout=None)によりTrueになるまで待機します。 setされているか確認する.is_set()もありset状態ならTrue、そうで無ければFalseを返します。
あるスレッドが他のスレッドに処理させて結果が来るまで待機するような使い方が出来ます。
1 2 3 4 5 6 7 8 9 10 11 12 |
ev = threading.Event() def my_thread(): while True: ev.wait() # same codes ev.clear() thread = threading.Thread(target=my_thread) thread.start() ev.set() |
繰り返し実行されるmy_threadを都度作るのではなく既存スレッドとして待機させておいて、何かの操作等で実行開始させると言ったことが出来ます。 また、.is_set()で状態を知ることが出来るのでグローバルなフラグとしても使う事が出来ます(プログラムの終了処理をするとき。各スレッドに通知する)
Timer
タイマです。 スレッドの実装の一種でもあります。
.Timer(interval, function. args=None, kwargs=None)で作成します。 作成からinterval秒後にfunctionがargs/kwargsを引数として実行されます、呼び出される処理もまたスレッドです(本来のプログラムと並行して走る)
functionが実行される前は.cancel()によって停止する事が出来ます(functionが開始されると停止できません)
Tkinter等のGUIでは重い処理を呼び出すと画面側が固まってしまうので、スレッド化して処理を行うのがスマートです。 例えば、実行ボタンを押下すると実行ボタンをdisableに切り替えた上でスレッドを起こし、停止ボタンはEvent.set()して停止を通知し停止処理完了をEvent.wait()で待機し、スレッド側は停止できるタイミングでEvent.is_set()で状況を確認してTrueなら停止処理を実施した後、停止処理完了を通知して停止ボタン処理を再開させて実行ボタンを再度enableにするなどの実装を行います。
まだPythonの並列処理機能はそれほど充実していません。 例えばスレッドを外部から停止する手段がないです(thead.kill()のような) このためスレッド側は自ら停止できるように設計し状態を管理しなければなりません。 定期的に停止要求があるかスレッド側が確認したり、ブロックする処理はタイムアウトするように作る等です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
quit_flag = threading.Event() def worker_thread(): while True: if quit_flag.is_set(): return # working worker = threading.Thread(target=worker_thread) worker.start() while True: cmd = input('>') if cmd == 'quit': quit_flag.set() timeout = 60 while threading.active_count()>1: sleep(1) timeout -= 1 if timeout<1: raise MyTimeoutException() break |
quit_flagが立ったら各スレッドは適当なところで終了します。 threading.active_count()は生きているスレッドの数を返します、メインスレッド自身があるので 他の全てが終了した段階で1になります。 終了できないスレッドがあると進まないので強制的に移行するためタイムアウトを用意しています、しかしスレッドが生きている限りプログラムは生存します。 強制的に終了状態に遷移するためにはOSのシステムコールなどを呼び出して終了しなければなりません、このため並列の実装にはタイムアウトや制御できないブロック処理を書いてはいけません。 途中終了して良いスレッドの場合はThread()の引数daemonをTrueにするとメインスレッドが終了するときに強制的に停止します(Python3.3以降)
例えば先のコマンド入力自体をスレッドにする場合、これは悪手です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
quit_flag = threading.Event() def worker_thread(): while True: if quit_flag.is_set(): return # working worker = threading.Thread(target=worker_thread) worker.start() def command_thread(): while True: if quit_flag.is_set(): return cmd = input('>') if cmd == 'quit': quit_flag.set() timeout = 60 while threading.active_count()>1: sleep(1) timeout -= 1 if timeout<1: raise MyTimeoutException() break control_thread = threading.Thread(target=command_thread) control_thread.start() |
一見終了できそうですが、inputは何かが送信(通常Enter)されるまで処理をブロックするので他で終了イベントが発生してもプログラムは終了できません(sys.exitも届きません、つまり親スレッドでsys.exitしても親スレッドだけが終了するし、子スレッドでsys.exitしても子スレッドしか終了しません)
control_thread = threading.Thread(target=command_thread, deamon=True)
この場合はdaemonなので、強制的に終了されるのでプログラムは終了できます。 「強制的」である点に注意してください。 つまり何ら終了処理を伴わずに処理が停止されエラーメッセージが表示されることがあります(input状態のdaemonが終了するときFatal errorを生み出します) 従って綺麗に終了するためには、これらのブロックが発生する処理は現状のPythonではメインスレッド上にのみ実装するようにします。 子スレッドはメインスレッドをos.kill()によって強制的に停止することが出来ます。
1 |
os.kill(os.getpid(), signal.SIGTERM) |
threadingは一つのプロセス上で動くので、getpid()した自分自身のプロセスにkillを送信する事でメインスレッドを停止しプログラムは終了できます(Pythonは必ずメインスレッドにシグナルが到達します) しかし、現状では終了シグナルではシグナルハンドラは起動されないので強制終了できるだけで任意の終了処理、例えば保存して終了などは実行できません。 従って、ある子スレッドが終了を要求するとき必要な処理を行い停止フラグを立てて各スレッドが停止した後にメインスレッドをKillします、綺麗では無いけど一番マシなデザインです。
(494)