Pythonで非同期の雰囲気

概要

重い処理を同時実行したいときに、非同期を使う。Pythonの非同期処理には2つのライブラリが存在し、以下のような簡単な方法でどちらを使うべきか知ることができる。

  • asyncio

    重い処理がasyncioに対応しているとき。(I/O待ちであるとき=await asyncio.sleep(n)がその関数の中で呼ばれているとき。)

  • concurrent.futuresProcessPoolExecutorおよび** joblibParrarel**

    重い処理がasyncioに対応していないとき。(CPUが動いているとき。)

どうでもいい注意

  • joblibで使われるlokyはこのconcurrent.futuresが元になっているが、ローカル関数が使えたり、(デッドロックせずに)入れ子された(nested)非同期処理が可能になるため、より推奨される。
  • ProcessPoolExecutorは内部でmultiprocessingモジュールを使っているが、このモジュールを直接使うのは特別な理由がない限り推奨しない。
  • これらはasyncio.loop.run_in_executorで併用できるが、あまり使われない。
  • わかりやすくするためにtasksresultsを分けて書いたが、joblibでは普通一行で書く。

asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

#目的の関数
async def function():
pass

async def loop():
tasks = [asyncio.create_task(function()) for i in range(10)]
results = await asyncio.gather(*tasks)
return results

def main():
results = asyncio.run(loop)

joblib

1
2
3
4
5
6
7
8
9
10
11
12
13
import joblib

#目的の関数
def function():
pass

def loop():
tasks = [joblib.delayed(function)() for i in range(10)]
results = joblib.Parallel(n_jobs=-1)(tasks)
return results

def main():
results = loop()

concurrent.futures

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import concurrent.futures
# import loky

#目的の関数
def function():
pass

def loop():
with concurrent.futures.ProcessPoolExecutor() as executor:
# with loky.get_reusable_executor() as executor:
tasks = [executor.submit(function) for i in range(10)]
results = [future.result() for future in concurrent.futures.wait(tasks).done]
return results

def main():
results = loop()

ProgressBarをつける

1
pip install tqdm

asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
# import tqdm
import tqdm.asyncio

#目的の関数
async def function():
pass

async def loop():
tasks = [asyncio.create_task(function()) for i in range(10)]
# results = [await f for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))] # 古いやり方
results = await tqdm.asyncio.gather(*tasks)
return results

def main():
results = asyncio.run(loop())

joblib

1
pip install tqdm_joblib
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import joblib
from tqdm_joblib import tqdm_joblib

#目的の関数
def function():
pass

def loop():
with tqdm_joblib():
tasks = [joblib.delayed(function)() for i in range(10)]
results = joblib.Parallel(n_jobs=-1)(tasks)
return results

def main():
results = loop()

concurrent.futures

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import concurrent.futures
import tqdm

#目的の関数
def function():
pass

def loop():
with concurrent.futures.ProcessPoolExecutor() as executor:
tasks = [executor.submit(function) for i in range(10)]
results = [future.result() for future in tqdm.tqdm(concurrent.futures.as_completed(tasks), total=len(tasks))]
return results

def main():
results = loop()

コメント