2022/12/28 24時間AIファッションショーのベータ版配信を開始しました!!!

16-27. Pythonのsubprocess.Popen()による並列実行でリアルタイム標準出力がしたかった備忘録

やること

Pythonでサブプロセスを複数実行したいことがあるので、subprocess.Popen() を用いた「直列実行 or 並列実行」「おまとめ出力 or リアルタイム出力」の4通りの組み合わせの備忘録を残しておきます。

参考文献

まず、こちらのサイトは subprocess.run() と subprocess.Popen() の関係を完全にまとめてくださっています。

subprocessについてより深く(3系,更新版) - Qiita
はじめに 2017年に書いた記事の内容が2系ベースであり,かついい加減情報を更新したほうがいいなと思い,編集に着手した結果,subprocess.run()をはじめとする大幅な追記が必要となりそうになったため,本記事を新規に作成...

リアルタイム出力についてはこちらを参考にさせていただきました。

Pythonから別プログラムを実行し、その出力を実行元で得たい。
### 前提・実現したいこと Pythonから別プログラムを実行し、その出力を実行元で得たい。 下記のようにプログラムを書いたのですが何も出力されず実行されたところで止まってしまいます。

恩返しのつもりで私も備忘録を垂れ流します。

実行環境

WinPython3.6をおすすめしています。

WinPython - Browse /WinPython_3.6/3.6.7.0 at SourceForge.net
Portable Scientific Python 2/3 32/64bit Distribution for Windows

直列実行 × おまとめ出力

まずは5つの子プロセスを直列実行し、子が終了する度におまとめ出力する方法です。

「sub1.py」という子を作りました。子IDとして引数を受け取り、開始時と終了時に print() します。中身の処理は3秒かけて hello と出力するだけです。

import sys
import time

my_id = int(sys.argv[1])

print('start {}'.format(my_id))

#3秒かけてhelloを出力
time.sleep(3)
print('hello {}'.format(my_id))

print('end {}'.format(my_id))

これをメインプログラムから5回呼び出します。本来、subprocess.Popen() は子の終了を待たずに先に進むのですが、proc.communicate() が子の終了を待つコマンドのため直列になります。

import subprocess

for i in range(5):
    print('### master loop {}'.format(i))
    
    #子の実行コマンド
    command = 'C:/Users/aaa/Desktop/WPy-3670/python-3.6.7.amd64/python.exe sub1.py {}'.format(i)
    
    #子の処理を開始(標準エラーは標準出力に合体)
    proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    
    #子の終了を待ち、一気に標準出力
    out = proc.communicate()[0].decode('utf8', 'replace')
    print(out)

直列なので15秒かかりました。それぞれの子の出力(startからendの3行)が一気に表示されるのが特徴です。

並列実行 × おまとめ出力

次に、これを並列実行にしてみます。

「sub1.py」は共通で、メインを次のようにします。subprocess.Popen() で5つの子を投げきります。その後、子の終了を順番に proc.communicate() で待っておまとめ出力します。

procs = {}
for i in range(5):
    print('### master loop {}'.format(i))
    
    #子の実行コマンド
    command = 'C:/Users/aaa/Desktop/WPy-3670/python-3.6.7.amd64/python.exe sub1.py {}'.format(i)
    
    #子の処理を開始(標準エラーは標準出力に合体)
    proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    
    #子を格納
    procs[i] = proc
    
#順番に子の終了を待ち、一気に標準出力
for proc in procs.values():
    out = proc.communicate()[0].decode('utf8', 'replace')
    print(out)

並列なので3秒で終わりました。最初の子が終了していれば他の子も終了しているので、結局すべての子の出力が一瞬で出ることに注意してください。

直列実行 × リアルタイム出力

次に、5つの子プロセスを直列実行し、子の出力をリアルタイムで標準出力します。

「sub2.py」という子を作りました。やっていることはsub1.pyと同じですが、print() の代わりに sys.stdout.write() と sys.stdout.flush() を使っています。こうしないとリアルタイム出力できないようです。明示的に改行を入れているのもご注意ください。

import sys
import time

my_id = int(sys.argv[1])

sys.stdout.write('start {}\n'.format(my_id))
sys.stdout.flush()

#3秒かけてhelloを出力
time.sleep(3)
sys.stdout.write('hello {}\n'.format(my_id))
sys.stdout.flush()

sys.stdout.write('end {}\n'.format(my_id))
sys.stdout.flush()

次のメインプログラムで5回呼び出します。proc.stdout.readline() で1行ずつ出力を待つのがポイントです。

for i in range(5):
    print('### master loop {}'.format(i))
    
    #子の実行コマンド
    command = 'C:/Users/aaa/Desktop/WPy-3670/python-3.6.7.amd64/python.exe sub2.py {}'.format(i)
    
    #子の処理を開始(標準エラーは標準出力に合体)
    proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    
    #子をリアルタイム出力
    while True:
        #子から出力が1行あるまで待ち受ける
        line = proc.stdout.readline().decode('utf8', 'replace')
        if line:
            print(line, end='')
        if not line and proc.poll() is not None:
            break

これも直列なので15秒かかりますが、子のstartが一足先に出力されていることからリアルタイム出力であることが分かります。

並列実行 × リアルタイム出力(疑似)

最後に問題児です。言い訳をすると、理想的な形にするには非常に複雑な処理が必要で・・・、ということで妥協して「疑似」リアルタイム並列出力しています。

「sub2.py」は共通で、メインを次のようにします。subprocess.Popen() で5つの子を投げきった後、子0から順に繰り返し1行出力を待ち伏せます。

procs = {}
for i in range(5):
    print('### master loop {}'.format(i))
    
    #子の実行コマンド
    command = 'C:/Users/aaa/Desktop/WPy-3670/python-3.6.7.amd64/python.exe sub2.py {}'.format(i)
    
    #子の処理を開始(標準エラーは標準出力に合体)
    proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    
    #子を格納
    procs[i] = proc
    
#並列処理中の子のリアルタイム出力(ただし同期なので注意)
while True:
    
    #辞書にある子をチェック
    for key in procs.keys():
        #子から出力が1行あるまで待ち受ける
        line = procs[key].stdout.readline().decode('utf8', 'replace')
        if line:
            print(line, end='')
        
        #子が終わっていればNoneに
        if not line and procs[key].poll() is not None:
            procs[key] = None
    
    #辞書を圧縮
    procs = {key: value for key, value in procs.items() if value is not None}
    
    #すべての子が終わったら終了
    if len(procs) == 0:
        break

並列なので3秒です。子0から順に出力を待ち伏せるため、出力はかならず子0→子4の順を繰り返します。調子の良い子が2回連続で出力することはありません。そして残念なことに、一番調子が悪い子が律速になります。子の出力をテキスト保存するようにすれば調子の良い子から先に終わる気がしますが、今回のようなメインへの出力に関しては遅い子が律速になります。

結局

結局のところUbuntuかなんかに持っていってシェルスクリプトで制御するのがシンプルです。おそらくこんな感じの「main.sh」で5並列の2直列とかもいけると思います(十分に検証していません)

#!/bin/sh

cat << EOS | xargs -P 5 -I{} sh -c '{}'
python3 sub1.py 0
python3 sub1.py 1
python3 sub1.py 2
python3 sub1.py 3
python3 sub1.py 4
EOS

cat << EOS | xargs -P 5 -I{} sh -c '{}'
python3 sub1.py 5
python3 sub1.py 6
python3 sub1.py 7
python3 sub1.py 8
python3 sub1.py 9
EOS

以上です。

タイトルとURLをコピーしました