ゴミ話#
久しぶりにブログを書いていなかったので、もう水をかけるわけにはいかないと思い、自分に目標を設定しました。concurrent.future に関する内容を書こうと思い、この文章では Python 3.2 で新たに追加された concurrent.future モジュールについてお話しします。
本文#
Python の非同期処理#
ある Python 開発者の小明は、面接中に突然次のような要求を受けました:いくつかのウェブサイトにリクエストを送り、彼らのデータを取得すること。小明は考えました、簡単だ、パパッと書いてみました。
import multiprocessing
import time
def request_url(query_url: str):
time.sleep(3) # リクエスト処理ロジック
if __name__ == '__main__':
url_list = ["abc.com", "xyz.com"]
task_list = [multiprocessing.Process(target=request_url, args=(url,)) for url in url_list]
[task.start() for task in task_list]
[task.join() for task in task_list]
簡単ですね。さて、新しい要求が来ました。各リクエストの結果を取得したいのですが、どうすればいいでしょうか?小明は考え、次のようなコードを書きました。
import multiprocessing
import time
def request_url(query_url: str, result_dict: dict):
time.sleep(3) # リクエスト処理ロジック
result_dict[query_url] = {} # 結果を返す
if __name__ == '__main__':
process_manager = multiprocessing.Manager()
result_dict = process_manager.dict()
url_list = ["abc.com", "xyz.com"]
task_list = [multiprocessing.Process(target=request_url, args=(url, result_dict)) for url in url_list]
[task.start() for task in task_list]
[task.join() for task in task_list]
print(result_dict)
よし、面接官が言いました。「うん、見た目は良いね。さて、もう少し問題を変えよう。まず、主プロセスをブロックしてはいけない。主プロセスはタスクの現在の状態(終了 / 未終了)に基づいて、対応する結果をタイムリーに取得する必要がある。どう改良する?」小明は考えました。「それなら、セマフォを使って、タスクが完了した後に親プロセスに信号を送るのはどうだろう?それとももっと簡単な方法はないのか?」どうやらそれは無さそうです。最後に面接官は心の中で「naive」と言い、顔には笑みを浮かべて小明に帰って待つように言いました。
小明の窮地から、私たちは次のような問題を見出すことができます。私たちが最もよく使用する multiprocessing
または threading
の 2 つのモジュールは、非同期タスクを実現したい場面では、実は少し不親切です。私たちはしばしば、比較的クリーンに非同期の要求を実現するために、いくつかの追加作業を行う必要があります。このような窮地を解決するために、2009 年 10 月にブライアン・クインラン氏が PEP 3148 を提案しました。この提案では、私たちがよく使用する multiprocessing
と threading
モジュールをさらにラップし、非同期操作をより良くサポートすることを目指しました。最終的にこの提案は Python 3.2 に導入されました。つまり、今日は concurrent.future についてお話しします。
Future モード#
新しいモジュールについて本格的に話し始める前に、Future
モードに関する関連情報を理解する必要があります。
まず、Future
モードとは何でしょうか?
Future
は実際には生産者 - 消費者モデルの一種の拡張です。生産者 - 消費者モデルでは、生産者は消費者がデータを処理するタイミングや、消費者が処理した結果には関心を持ちません。例えば、私たちはよく次のようなコードを書きます。
import multiprocessing, Queue
import os
import time
from multiprocessing import Process
from time import sleep
from random import randint
class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
self.queue.put('one product')
print(multiprocessing.current_process().name + str(os.getpid()) + ' produced one product, the no of queue now is: %d' %self.queue.qsize())
sleep(randint(1, 3))
class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
d = self.queue.get(1)
if d != None:
print(multiprocessing.current_process().name + str(os.getpid()) + ' consumed %s, the no of queue now is: %d' %(d,self.queue.qsize()))
sleep(randint(1, 4))
continue
else:
break
#create queue
queue = multiprocessing.Queue(40)
if __name__ == "__main__":
print('ワクワクする!')
#create processes
processed = []
for i in range(3):
processed.append(Producer(queue))
processed.append(Consumer(queue))
#start processes
for i in range(len(processed)):
processed[i].start()
#join processes
for i in range(len(processed)):
processed[i].join()
これが生産者 - 消費者モデルの簡単な実装です。私たちは multiprocessing
の Queue
を通信チャネルとして利用し、生産者はキューにデータを投入し、消費者はキューからデータを取得して処理します。しかし、上記のように、このモデルでは生産者は消費者がデータを処理するタイミングや、処理結果には関心を持ちません。一方、Future
では、生産者がメッセージ処理の完了を待つことができ、必要に応じて関連する計算結果を取得することもできます。
例えば、次のような Java コードを見てみましょう。
package concurrent;
import java.util.concurrent.Callable;
public class DataProcessThread implements Callable<String> {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
Thread.sleep(10000);//データ処理をシミュレート
return "データ返却";
}
}
これは私たちがデータを処理するためのコードです。
package concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class MainThread {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
// TODO Auto-generated method stub
DataProcessThread dataProcessThread = new DataProcessThread();
FutureTask<String> future = new FutureTask<String>(dataProcessThread);
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(future);
Thread.sleep(10000);//他の業務処理をシミュレート
while (true) {
if (future.isDone()) {
System.out.println(future.get());
break;
}
}
executor.shutdown();
}
}
これは私たちのメインスレッドです。皆さんは、データ処理タスクの状態を簡単に取得できることがわかります。同時に関連する結果も取得できます。
Python における concurrent.futures#
前述のように、Python 3.2 以降、concurrent.futures は組み込みモジュールであり、直接使用できます。
注意: Python 2.7 で concurrent.futures を使用する必要がある場合は、pip でインストールしてください。
pip install futures
さて、準備が整ったので、このものをどう使うか見てみましょう。
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
if __name__ == '__main__':
pool = ProcessPoolExecutor(max_workers=2) # 最大2つのタスクを受け入れるプロセスプールを作成
future1 = pool.submit(return_future_result, ("hello")) # プロセスプールにタスクを追加
future2 = pool.submit(return_future_result, ("world")) # プロセスプールにタスクを追加
print(future1.done()) # タスク1が終了したかどうかを判断
time.sleep(3)
print(future2.done()) # タスク2が終了したかどうかを判断
print(future1.result()) # タスク1が返した結果を確認
print(future2.result()) # タスク2が返した結果を確認
まず from concurrent.futures import ProcessPoolExecutor
で concurrent.futures
から ProcessPoolExecutor
をインポートし、後のデータ処理を行います。(concurrent.futures
では、私たちに 2 種類の Executor
を提供しています。一つは現在使用している ProcessPoolExecutor
、もう一つは ThreadPoolExecutor
です。これらは外部に公開されているメソッドが一致しており、皆さんは実際のニーズに応じて選択できます。)
次に、最大容量が 2 のプロセスプールを初期化します。そして、プロセスプールの submit
メソッドを呼び出してタスクを提出します。面白い点が来ました。submit
メソッドを呼び出すと、特別な変数が得られます。この変数は Future
クラスのインスタンスであり、将来完了する操作を表します。言い換えれば、submit
が Future
インスタンスを返すとき、私たちのタスクはまだ完了していない可能性があります。Future
インスタンスの done
メソッドを呼び出すことで、現在のタスクの実行状態を取得できます。タスクが終了した後、result
メソッドを呼び出すことで返された結果を取得できます。もし後続のロジックを実行する際に、何らかの理由でタスクをキャンセルしたい場合は、cancel
メソッドを呼び出して現在のタスクをキャンセルできます。
さて、新しい問題が出てきました。たくさんのタスクを提出したい場合はどうすればいいのでしょうか?concurrent.future
は私たちがタスクを一括で追加するための map
メソッドを提供しています。
import concurrent.futures
import requests
task_url = [('http://www.baidu.com', 40), ('http://example.com/', 40), ('https://www.github.com/', 40)]
def load_url(params: tuple):
return requests.get(params[0], timeout=params[1]).text
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
for url, data in zip(task_url, executor.map(load_url, task_url)):
print('%r page is %d bytes' % (url, len(data)))
ええ、concurrent.future
のスレッド / プロセスプールが提供する map
メソッドは、標準ライブラリの map
関数と使用方法が同じです。
concurrent.futures を解析する#
前述のように concurrent.futures
の使い方を説明した後、私たちは concurrent.futures
がどのように Future
モードを実現しているのか、タスクと結果をどのように関連付けているのかに興味を持っています。今から submit
メソッドから始めて、ProcessPoolExecutor
の実装を簡単に見てみましょう。
まず、ProcessPoolExecutor
を初期化する際、その __init__
メソッドでいくつかの重要な変数の初期化操作が行われます。
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""新しい ProcessPoolExecutor インスタンスを初期化します。
引数:
max_workers: 指定された呼び出しを実行するために使用できるプロセスの最大数。
None または指定されていない場合は、マシンのプロセッサ数に応じて
ワーカープロセスが作成されます。
"""
_check_system_limits()
if max_workers is None:
self._max_workers = os.cpu_count() or 1
else:
if max_workers <= 0:
raise ValueError("max_workers は 0 より大きくなければなりません")
self._max_workers = max_workers
# 呼び出しキューのサイズをプロセス数よりも少し大きくして
# ワーカープロセスがアイドル状態になるのを防ぎます。しかし、あまり大きくしないでください。
# 呼び出しキュー内の futures はキャンセルできません。
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
# 終了したワーカープロセスは、キューの自身のワーカースレッドで
# スプリアスな "broken pipe" トレースバックを生成する可能性があります。しかし、私たちは
# 終了したプロセスを検出するので、トレースバックを無視します。
self._call_queue._ignore_epipe = True
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
# pids からプロセスへのマップ
self._processes = {}
# シャットダウンは二段階のプロセスです。
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
さて、今日のエントリーポイントである submit
メソッドを見てみましょう。
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool('子プロセスが突然終了しました。'
'プロセスプールはもはや使用できません')
if self._shutdown_thread:
raise RuntimeError('シャットダウン後は新しい futures をスケジュールできません')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
# キュー管理スレッドを起こす
self._result_queue.put(None)
self._start_queue_management_thread()
return f
まず、渡された引数 fn
は私たちの処理関数であり、args
および kwargs
は fn
関数に渡す引数です。submit
関数の最初で、まず _broken
と _shutdown_thread
の値に基づいて、現在のプロセスプール内の処理プロセスの状態と現在のプロセスプールの状態を判断します。もし処理プロセスが突然終了したり、プロセスプールがすでにシャットダウンされている場合、現在の submit
操作を受け付けないことを示す例外がスローされます。
前述の状態に問題がなければ、まず Future
クラスをインスタンス化し、そのインスタンスと処理関数および関連する引数を使って _WorkItem
クラスをインスタンス化します。次に、インスタンス w
を値として、_queue_count
をキーとして _pending_work_items
に保存します。そして、_start_queue_management_thread
メソッドを呼び出して、プロセスプール内の管理スレッドを開始します。今、この部分のコードを見てみましょう。
def _start_queue_management_thread(self):
# エグゼキュータが失われた場合、weakref コールバックがキュー管理スレッドを起こします。
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
# プロセスを開始して、そのセントinelが知られるようにします。
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
この部分は非常にシンプルです。まず _adjust_process_count
メソッドを実行し、その後 _queue_management_worker
メソッドを実行するためのデーモンスレッドを開始します。私たちはいくつかの変数を渡しました。まず、_processes
は私たちのプロセスマッピングであり、_pending_work_items
には待機中のタスクが保存されています。また、_call_queue
と _result_queue
もあります。さて、もう一つのパラメータは皆さんがあまり理解していないかもしれませんが、それは weakref.ref(self, weakref_cb)
です。
まず、Python はガーベジコレクション機構を持つ言語であり、GC (Garbage Collection) 機構があるということは、ほとんどの場合、メモリの割り当てと解放を気にする必要がないことを意味します。Python では、オブジェクトがいつ回収されるかは、その参照カウントによって決まります。参照カウントが 0 になると、そのオブジェクトは回収されます。しかし、交差参照やその他の理由により、オブジェクトの参照カウントが常に 0 にならない場合、そのオブジェクトは回収されず、メモリリークが発生します。したがって、通常の参照とは異なり、Python では弱参照という参照メカニズムが追加されました。弱参照の意味は、ある変数がオブジェクトを保持していても、そのオブジェクトの参照カウントを増やさないことです。したがって、weakref.ref(self, weakref_cb)
は大多数の場合、self
と等価です(ここでなぜ弱参照を使用するのかについては、別の章で説明します)。
さて、この部分のコードを見終わったので、_queue_management_worker
がどのように実装されているか見てみましょう。
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue):
"""このプロセスとワーカープロセス間の通信を管理します。
この関数はローカルスレッドで実行されます。
executor_reference: このスレッドを所有する ProcessPoolExecutor への weakref.ref
引数:
process: このスレッドで使用される multiprocessing.Process インスタンスのリスト。
このスレッドが終了できるかどうかを判断するために使用されます。
pending_work_items: work ids を _WorkItems にマッピングする辞書 e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: work ids の queue.Queue e.g. Queue([5, 6, ...])。
call_queue: _WorkItems から派生した _CallItems で満たされる multiprocessing.Queue。
result_queue: プロセスワーカーによって生成された _ResultItems の multiprocessing.Queue。
"""
executor = None
def shutting_down():
return _shutdown or executor is None or executor._shutdown_thread
def shutdown_worker():
# これは上限です
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
# キューのリソースをできるだけ早く解放します。
call_queue.close()
# 作成されたプロセスで .join() が呼ばれない場合、
# 一部の multiprocessing.Queue メソッドが Mac OS X でデッドロックする可能性があります。
for p in processes.values():
p.join()
reader = result_queue._reader
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# プロセスプールを壊れたとマークし、すぐに submits を失敗させます。
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# すべてのフューチャーは失敗としてマークされなければなりません
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"プロセスプール内のプロセスが "
"実行中または保留中に突然終了しました。"
))
# オブジェクトへの参照を削除します。See issue16284
del work_item
pending_work_items.clear()
# 残りのワーカーを強制的に終了させます:キューまたはその
# ロックが汚れた状態になっている可能性があり、永遠にブロックされる可能性があります。
for p in processes.values():
p.terminate()
shutdown_worker()
return
if isinstance(result_item, int):
# PID を使用してワーカーをクリーンにシャットダウンします
# (エグゼキュータを壊れたとマークしない)
assert shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item は他のプロセスが終了した場合 None になる可能性があります(上記参照)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# オブジェクトへの参照を削除します。See issue16284
del work_item
# シャットダウンを開始するべきかどうかを確認します。
executor = executor_reference()
# 次の条件下では新しい作業アイテムを追加できません:
# - インタプリタがシャットダウン中であるか
# - このワーカーを所有するエグゼキュータが収集されたか
# - このワーカーを所有するエグゼキュータがシャットダウンされたか。
if shutting_down():
try:
# 新しい作業アイテムを追加できないので、保留中の作業アイテムがない場合は
# このスレッドをシャットダウンするのが安全です。
if not pending_work_items:
shutdown_worker()
return
except Full:
# これは問題ではありません:最終的には目を覚まし(result_queue.get() で)
# 再びセントinelを送信できるようになります。
pass
executor = None
おなじみの無限ループです。ループの最初のステップでは、_add_call_item_to_queue
関数を使用して、待機キュー内のタスクを呼び出しキューに追加します。この部分のコードを見てみましょう。
def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
"""pending_work_items から _WorkItems を使用して call_queue を満たします。
この関数はブロックしません。
引数:
pending_work_items: work ids を _WorkItems にマッピングする辞書 e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids: work ids の queue.Queue e.g. Queue([5, 6, ...])。Work ids
は消費され、対応する _WorkItems が
pending_work_items から _CallItems に変換され、call_queue に入れられます。
call_queue: _WorkItems から派生した _CallItems で満たされる multiprocessing.Queue。
"""
while True:
if call_queue.full():
return
try:
work_id = work_ids.get(block=False)
except queue.Empty:
return
else:
work_item = pending_work_items[work_id]
if work_item.future.set_running_or_notify_cancel():
call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
del pending_work_items[work_id]
continue
まず、呼び出しキューがすでに満杯かどうかを判断します。もし満杯であれば、このループを放棄します。次に、work_id
キューから取得し、待機タスクから対応する _WorkItem
インスタンスを取得します。次に、インスタンスにバインドされた Future
インスタンスの set_running_or_notify_cancel
メソッドを呼び出してタスクの状態を設定し、次にそれを呼び出しキューに投入します。
def set_running_or_notify_cancel(self):
"""Future を実行中としてマークするか、キャンセル通知を処理します。
このメソッドは Executor 実装とユニットテストでのみ使用されるべきです。
Future がキャンセルされた場合(cancel() が呼び出されて True を返した場合)、その後
Future の完了を待っているスレッド(as_completed() または wait() の呼び出しを通じて)は通知され、
False が返されます。
Future がキャンセルされなかった場合、実行中の状態に置かれ(future の呼び出し
running() は True を返します)、True が返されます。
このメソッドは、Executor 実装がこの Future に関連する作業を実行する前に呼び出す必要があります。
このメソッドが False を返す場合、その作業は実行されるべきではありません。
戻り値:
Future がキャンセルされた場合は False、そうでない場合は True を返します。
例外:
RuntimeError: このメソッドがすでに呼び出された場合、または set_result()
または set_exception() が呼び出された場合。
"""
with self._condition:
if self._state == CANCELLED:
self._state = CANCELLED_AND_NOTIFIED
for waiter in self._waiters:
waiter.add_cancelled(self)
# self._condition.notify_all() は必要ありません。なぜなら
# self.cancel() が通知をトリガーするからです。
return False
elif self._state == PENDING:
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self),
self._state)
raise RuntimeError('Future in unexpected state')
この部分の内容は非常にシンプルです。現在のインスタンスが待機状態であれば、True を返し、キャンセル状態であれば、False を返します。_add_call_item_to_queue
関数内では、すでに cancel
状態にある _WorkItem
を待機タスクから削除します。
さて、私たちは _queue_management_worker
関数に戻ります。
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue):
"""このプロセスとワーカープロセス間の通信を管理します。
この関数はローカルスレッドで実行されます。
executor_reference: このスレッドを所有する ProcessPoolExecutor への weakref.ref
引数:
process: このスレッドで使用される multiprocessing.Process インスタンスのリスト。
このスレッドが終了できるかどうかを判断するために使用されます。
pending_work_items: work ids を _WorkItems にマッピングする辞書 e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: work ids の queue.Queue e.g. Queue([5, 6, ...])。
call_queue: _WorkItems から派生した _CallItems で満たされる multiprocessing.Queue。
result_queue: プロセスワーカーによって生成された _ResultItems の multiprocessing.Queue。
"""
executor = None
def shutting_down():
return _shutdown or executor is None or executor._shutdown_thread
def shutdown_worker():
# これは上限です
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
# キューのリソースをできるだけ早く解放します。
call_queue.close()
# 作成されたプロセスで .join() が呼ばれない場合、
# 一部の multiprocessing.Queue メソッドが Mac OS X でデッドロックする可能性があります。
for p in processes.values():
p.join()
reader = result_queue._reader
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# プロセスプールを壊れたとマークし、すぐに submits を失敗させます。
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# すべてのフューチャーは失敗としてマークされなければなりません
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"プロセスプール内のプロセスが "
"実行中または保留中に突然終了しました。"
))
# オブジェクトへの参照を削除します。See issue16284
del work_item
pending_work_items.clear()
# 残りのワーカーを強制的に終了させます:キューまたはその
# ロックが汚れた状態になっている可能性があり、永遠にブロックされる可能性があります。
for p in processes.values():
p.terminate()
shutdown_worker()
return
if isinstance(result_item, int):
# PID を使用してワーカーをクリーンにシャットダウンします
# (エグゼキュータを壊れたとマークしない)
assert shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item は他のプロセスが終了した場合 None になる可能性があります(上記参照)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# オブジェクトへの参照を削除します。See issue16284
del work_item
# シャットダウンを開始するべきかどうかを確認します。
executor = executor_reference()
# 次の条件下では新しい作業アイテムを追加できません:
# - インタプリタがシャットダウン中であるか
# - このワーカーを所有するエグゼキュータが収集されたか
# - このワーカーを所有するエグゼキュータがシャットダウンされたか。
if shutting_down():
try:
# 新しい作業アイテムを追加できないので、保留中の作業アイテムがない場合は
# このスレッドをシャットダウンするのが安全です。
if not pending_work_items:
shutdown_worker()
return
except Full:
# これは問題ではありません:最終的には目を覚まし(result_queue.get() で)
# 再びセントinelを送信できるようになります。
pass
executor = None
result_item
変数
私たちが見てみましょう。
まず、皆さんはここで少し疑問を持つかもしれません。
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
この wait
は何でしょうか?reader
は何でしょうか?一歩ずつ来ましょう。まず、前述のように、reader = result_queue._reader
も皆さんを疑問に思わせるでしょう。ここで result_queue
は multiprocess
の SimpleQueue
です。彼女には _reader
メソッドはありません。
class SimpleQueue(object):
def __init__(self, *, ctx):
self._reader, self._writer = connection.Pipe(duplex=False)
self._rlock = ctx.Lock()
self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = ctx.Lock()
上記のコードは SimpleQueue
の一部です。私たちは SimpleQueue
が本質的に Pipe
を使用してプロセス間通信を行っていることを明確に理解できます。そして、_reader
は Pipe
を読み取るための変数です。
注意: 他のプロセス間通信の方法を復習してください。
さて、この部分を理解した後、wait
メソッドを見てみましょう。
def wait(object_list, timeout=None):
'''
object_list のオブジェクトが準備完了/読み取り可能になるまで待機します。
準備完了/読み取り可能な object_list 内のオブジェクトのリストを返します。
'''
with _WaitSelector() as selector:
for obj in object_list:
selector.register(obj, selectors.EVENT_READ)
if timeout is not None:
deadline = time.time() + timeout
while True:
ready = selector.select(timeout)
if ready:
return [key.fileobj for (key, events) in ready]
else:
if timeout is not None:
timeout = deadline - time.time()
if timeout < 0:
return ready
この部分のコードは非常にシンプルです。まず、私たちが読み取るべきオブジェクトを登録し、timeout
が None の場合は、オブジェクトがデータを正常に読み取るまで待機し続けます。
さて、私たちは前述の _queue_management_worker
関数に戻ります。
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# プロセスプールを壊れたとマークし、すぐに submits を失敗させます。
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# すべてのフューチャーは失敗としてマークされなければなりません
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"プロセスプール内のプロセスが "
"実行中または保留中に突然終了しました。"
))
# オブジェクトへの参照を削除します。See issue16284
del work_item
pending_work_items.clear()
# 残りのワーカーを強制的に終了させます:キューまたはその
# ロックが汚れた状態になっている可能性があり、永遠にブロックされる可能性があります。
for p in processes.values():
p.terminate()
shutdown_worker()
return
私たちは wait
関数を使用して一連のオブジェクトを読み取ります。timeout
を設定していないため、可読オブジェクトの結果を取得したときに、result_queue._reader
がリストに含まれていない場合、処理プロセスが突然異常終了したことを意味します。このとき、私たちは後続の文を実行して現在のプロセスプールのシャットダウン操作を実行します。リストに含まれている場合、データを読み取り、result_item
変数を取得します。
私たちは次のコードを見てみましょう。
if isinstance(result_item, int):
# PID を使用してワーカーをクリーンにシャットダウンします
# (エグゼキュータを壊れたとマークしない)
assert shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item は他のプロセスが終了した場合 None になる可能性があります(上記参照)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# オブジェクトへの参照を削除します。See issue16284
del work_item
まず、result_item
変数が int 型である場合、皆さんは _process_worker
関数内に次のようなロジックがあったことを覚えているかもしれません。
call_item = call_queue.get(block=True)
if call_item is None:
# キュー管理スレッドを起こす
result_queue.put(os.getpid())
return
呼び出しキューに新しいタスクがない場合、現在のプロセスの pid
を結果キューに放入します。したがって、result_item
の値が int であれば、以前のタスク処理が完了したことを意味し、私たちはクリーンアップを開始し、プロセスプールをシャットダウンします。
もし result_item
が int でも None でもない場合、必然的に _ResultItem
のインスタンスです。私たちは work_id
に基づいて _WorkItem
インスタンスを取得し、発生した例外または値を _WorkItem
インスタンス内の Future
インスタンス(つまり、私たちが submit した後に返されたもの)にバインドします。
最後に、この work_item
を削除し、完了です。
最後に#
長々としたゴミのような文章を書いてしまいましたが、皆さんが気にしないことを願っています。実際、私たちは concurrent.future
の実装を見ると、特に高深な黒魔法を使用しているわけではありませんが、その中の細部は一つ一つ味わう価値があります。したがって、この文章はここで終わりにします。後で機会があれば、concurrent.future
の他の部分のコードも見てみましょう。味わうべき点がたくさんあります。