Manjusaka

Manjusaka

Python concurrent.future 使用チュートリアルとソースコードの初歩的な分析

ゴミ話#

久しぶりにブログを書いていなかったので、もう水をかけるわけにはいかないと思い、自分に目標を設定しました。concurrent.future に関する内容を書こうと思い、この文章では Python 3.2 で新たに追加された concurrent.future モジュールについてお話しします。

本文#

Python の非同期処理#

ある Python 開発者の小明は、面接中に突然次のような要求を受けました:いくつかのウェブサイトにリクエストを送り、彼らのデータを取得すること。小明は考えました、簡単だ、パパッと書いてみました。


import multiprocessing
import time


def request_url(query_url: str):
    time.sleep(3)  # リクエスト処理ロジック


if __name__ == '__main__':
    url_list = ["abc.com", "xyz.com"]
    task_list = [multiprocessing.Process(target=request_url, args=(url,)) for url in url_list]
    [task.start() for task in task_list]
    [task.join() for task in task_list]

簡単ですね。さて、新しい要求が来ました。各リクエストの結果を取得したいのですが、どうすればいいでしょうか?小明は考え、次のようなコードを書きました。


import multiprocessing
import time


def request_url(query_url: str, result_dict: dict):
    time.sleep(3)  # リクエスト処理ロジック
    result_dict[query_url] = {}  # 結果を返す


if __name__ == '__main__':
    process_manager = multiprocessing.Manager()
    result_dict = process_manager.dict()
    url_list = ["abc.com", "xyz.com"]
    task_list = [multiprocessing.Process(target=request_url, args=(url, result_dict)) for url in url_list]
    [task.start() for task in task_list]
    [task.join() for task in task_list]
    print(result_dict)

よし、面接官が言いました。「うん、見た目は良いね。さて、もう少し問題を変えよう。まず、主プロセスをブロックしてはいけない。主プロセスはタスクの現在の状態(終了 / 未終了)に基づいて、対応する結果をタイムリーに取得する必要がある。どう改良する?」小明は考えました。「それなら、セマフォを使って、タスクが完了した後に親プロセスに信号を送るのはどうだろう?それとももっと簡単な方法はないのか?」どうやらそれは無さそうです。最後に面接官は心の中で「naive」と言い、顔には笑みを浮かべて小明に帰って待つように言いました。

小明の窮地から、私たちは次のような問題を見出すことができます。私たちが最もよく使用する multiprocessing または threading の 2 つのモジュールは、非同期タスクを実現したい場面では、実は少し不親切です。私たちはしばしば、比較的クリーンに非同期の要求を実現するために、いくつかの追加作業を行う必要があります。このような窮地を解決するために、2009 年 10 月にブライアン・クインラン氏が PEP 3148 を提案しました。この提案では、私たちがよく使用する multiprocessingthreading モジュールをさらにラップし、非同期操作をより良くサポートすることを目指しました。最終的にこの提案は Python 3.2 に導入されました。つまり、今日は concurrent.future についてお話しします。

Future モード#

新しいモジュールについて本格的に話し始める前に、Future モードに関する関連情報を理解する必要があります。

まず、Future モードとは何でしょうか?

Future は実際には生産者 - 消費者モデルの一種の拡張です。生産者 - 消費者モデルでは、生産者は消費者がデータを処理するタイミングや、消費者が処理した結果には関心を持ちません。例えば、私たちはよく次のようなコードを書きます。


import multiprocessing, Queue
import os
import time
from multiprocessing import Process
from time import sleep
from random import randint

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        
    def run(self):
        while True:
            self.queue.put('one product')
            print(multiprocessing.current_process().name + str(os.getpid()) + ' produced one product, the no of queue now is: %d' %self.queue.qsize())
            sleep(randint(1, 3))
        
        
class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        
    def run(self):
        while True:
            d = self.queue.get(1)
            if d != None:
                print(multiprocessing.current_process().name + str(os.getpid()) + ' consumed  %s, the no of queue now is: %d' %(d,self.queue.qsize()))
                sleep(randint(1, 4))
                continue
            else:
                break
                
#create queue
queue = multiprocessing.Queue(40)
       
if __name__ == "__main__":
    print('ワクワクする!')
    #create processes    
    processed = []
    for i in range(3):
        processed.append(Producer(queue))
        processed.append(Consumer(queue))
        
    #start processes        
    for i in range(len(processed)):
        processed[i].start()
    
    #join processes    
    for i in range(len(processed)):
        processed[i].join()  

これが生産者 - 消費者モデルの簡単な実装です。私たちは multiprocessingQueue を通信チャネルとして利用し、生産者はキューにデータを投入し、消費者はキューからデータを取得して処理します。しかし、上記のように、このモデルでは生産者は消費者がデータを処理するタイミングや、処理結果には関心を持ちません。一方、Future では、生産者がメッセージ処理の完了を待つことができ、必要に応じて関連する計算結果を取得することもできます。

例えば、次のような Java コードを見てみましょう。

package concurrent;

import java.util.concurrent.Callable;

public class DataProcessThread implements Callable<String> {

	@Override
	public String call() throws Exception {
		// TODO Auto-generated method stub
		Thread.sleep(10000);//データ処理をシミュレート
		return "データ返却";
	}

}

これは私たちがデータを処理するためのコードです。


package concurrent;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class MainThread {

	public static void main(String[] args) throws InterruptedException,
			ExecutionException {
		// TODO Auto-generated method stub
		DataProcessThread dataProcessThread = new DataProcessThread();
		FutureTask<String> future = new FutureTask<String>(dataProcessThread);

		ExecutorService executor = Executors.newFixedThreadPool(1);
		executor.submit(future);

		Thread.sleep(10000);//他の業務処理をシミュレート
		while (true) {
			if (future.isDone()) {
				System.out.println(future.get());
				break;
			}
		}
		executor.shutdown();
	}

}

これは私たちのメインスレッドです。皆さんは、データ処理タスクの状態を簡単に取得できることがわかります。同時に関連する結果も取得できます。

Python における concurrent.futures#

前述のように、Python 3.2 以降、concurrent.futures は組み込みモジュールであり、直接使用できます。

注意: Python 2.7 で concurrent.futures を使用する必要がある場合は、pip でインストールしてください。pip install futures

さて、準備が整ったので、このものをどう使うか見てみましょう。


from concurrent.futures import ProcessPoolExecutor
import time


def return_future_result(message):
    time.sleep(2)
    return message


if __name__ == '__main__':
    pool = ProcessPoolExecutor(max_workers=2)  # 最大2つのタスクを受け入れるプロセスプールを作成
    future1 = pool.submit(return_future_result, ("hello"))  # プロセスプールにタスクを追加
    future2 = pool.submit(return_future_result, ("world"))  # プロセスプールにタスクを追加
    print(future1.done())  # タスク1が終了したかどうかを判断
    time.sleep(3)
    print(future2.done())  # タスク2が終了したかどうかを判断
    print(future1.result())  # タスク1が返した結果を確認
    print(future2.result())  # タスク2が返した結果を確認

まず from concurrent.futures import ProcessPoolExecutorconcurrent.futures から ProcessPoolExecutor をインポートし、後のデータ処理を行います。(concurrent.futures では、私たちに 2 種類の Executor を提供しています。一つは現在使用している ProcessPoolExecutor、もう一つは ThreadPoolExecutor です。これらは外部に公開されているメソッドが一致しており、皆さんは実際のニーズに応じて選択できます。)

次に、最大容量が 2 のプロセスプールを初期化します。そして、プロセスプールの submit メソッドを呼び出してタスクを提出します。面白い点が来ました。submit メソッドを呼び出すと、特別な変数が得られます。この変数は Future クラスのインスタンスであり、将来完了する操作を表します。言い換えれば、submitFuture インスタンスを返すとき、私たちのタスクはまだ完了していない可能性があります。Future インスタンスの done メソッドを呼び出すことで、現在のタスクの実行状態を取得できます。タスクが終了した後、result メソッドを呼び出すことで返された結果を取得できます。もし後続のロジックを実行する際に、何らかの理由でタスクをキャンセルしたい場合は、cancel メソッドを呼び出して現在のタスクをキャンセルできます。

さて、新しい問題が出てきました。たくさんのタスクを提出したい場合はどうすればいいのでしょうか?concurrent.future は私たちがタスクを一括で追加するための map メソッドを提供しています。

import concurrent.futures
import requests

task_url = [('http://www.baidu.com', 40), ('http://example.com/', 40), ('https://www.github.com/', 40)]


def load_url(params: tuple):
    return requests.get(params[0], timeout=params[1]).text


if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        for url, data in zip(task_url, executor.map(load_url, task_url)):
            print('%r page is %d bytes' % (url, len(data)))

ええ、concurrent.future のスレッド / プロセスプールが提供する map メソッドは、標準ライブラリの map 関数と使用方法が同じです。

concurrent.futures を解析する#

前述のように concurrent.futures の使い方を説明した後、私たちは concurrent.futures がどのように Future モードを実現しているのか、タスクと結果をどのように関連付けているのかに興味を持っています。今から submit メソッドから始めて、ProcessPoolExecutor の実装を簡単に見てみましょう。

まず、ProcessPoolExecutor を初期化する際、その __init__ メソッドでいくつかの重要な変数の初期化操作が行われます。


class ProcessPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None):
        """新しい ProcessPoolExecutor インスタンスを初期化します。

        引数:
            max_workers: 指定された呼び出しを実行するために使用できるプロセスの最大数。
            None または指定されていない場合は、マシンのプロセッサ数に応じて
            ワーカープロセスが作成されます。
        """
        _check_system_limits()

        if max_workers is None:
            self._max_workers = os.cpu_count() or 1
        else:
            if max_workers <= 0:
                raise ValueError("max_workers は 0 より大きくなければなりません")

            self._max_workers = max_workers

        # 呼び出しキューのサイズをプロセス数よりも少し大きくして
        # ワーカープロセスがアイドル状態になるのを防ぎます。しかし、あまり大きくしないでください。
        # 呼び出しキュー内の futures はキャンセルできません。
        self._call_queue = multiprocessing.Queue(self._max_workers +
                                                 EXTRA_QUEUED_CALLS)
        # 終了したワーカープロセスは、キューの自身のワーカースレッドで
        # スプリアスな "broken pipe" トレースバックを生成する可能性があります。しかし、私たちは
        # 終了したプロセスを検出するので、トレースバックを無視します。
        self._call_queue._ignore_epipe = True
        self._result_queue = SimpleQueue()
        self._work_ids = queue.Queue()
        self._queue_management_thread = None
        # pids からプロセスへのマップ
        self._processes = {}

        # シャットダウンは二段階のプロセスです。
        self._shutdown_thread = False
        self._shutdown_lock = threading.Lock()
        self._broken = False
        self._queue_count = 0
        self._pending_work_items = {}

さて、今日のエントリーポイントである submit メソッドを見てみましょう。


def submit(self, fn, *args, **kwargs):
    with self._shutdown_lock:
        if self._broken:
            raise BrokenProcessPool('子プロセスが突然終了しました。'
                'プロセスプールはもはや使用できません')
        if self._shutdown_thread:
            raise RuntimeError('シャットダウン後は新しい futures をスケジュールできません')
        f = _base.Future()
        w = _WorkItem(f, fn, args, kwargs)
        self._pending_work_items[self._queue_count] = w
        self._work_ids.put(self._queue_count)
        self._queue_count += 1
        # キュー管理スレッドを起こす
        self._result_queue.put(None)
        self._start_queue_management_thread()
        return f

まず、渡された引数 fn は私たちの処理関数であり、args および kwargsfn 関数に渡す引数です。submit 関数の最初で、まず _broken_shutdown_thread の値に基づいて、現在のプロセスプール内の処理プロセスの状態と現在のプロセスプールの状態を判断します。もし処理プロセスが突然終了したり、プロセスプールがすでにシャットダウンされている場合、現在の submit 操作を受け付けないことを示す例外がスローされます。

前述の状態に問題がなければ、まず Future クラスをインスタンス化し、そのインスタンスと処理関数および関連する引数を使って _WorkItem クラスをインスタンス化します。次に、インスタンス w を値として、_queue_count をキーとして _pending_work_items に保存します。そして、_start_queue_management_thread メソッドを呼び出して、プロセスプール内の管理スレッドを開始します。今、この部分のコードを見てみましょう。


def _start_queue_management_thread(self):
    # エグゼキュータが失われた場合、weakref コールバックがキュー管理スレッドを起こします。
    def weakref_cb(_, q=self._result_queue):
        q.put(None)

    if self._queue_management_thread is None:
        # プロセスを開始して、そのセントinelが知られるようにします。
        self._adjust_process_count()
        self._queue_management_thread = threading.Thread(
            target=_queue_management_worker,
            args=(weakref.ref(self, weakref_cb),
                  self._processes,
                  self._pending_work_items,
                  self._work_ids,
                  self._call_queue,
                  self._result_queue))
        self._queue_management_thread.daemon = True
        self._queue_management_thread.start()
        _threads_queues[self._queue_management_thread] = self._result_queue

この部分は非常にシンプルです。まず _adjust_process_count メソッドを実行し、その後 _queue_management_worker メソッドを実行するためのデーモンスレッドを開始します。私たちはいくつかの変数を渡しました。まず、_processes は私たちのプロセスマッピングであり、_pending_work_items には待機中のタスクが保存されています。また、_call_queue_result_queue もあります。さて、もう一つのパラメータは皆さんがあまり理解していないかもしれませんが、それは weakref.ref(self, weakref_cb) です。

まず、Python はガーベジコレクション機構を持つ言語であり、GC (Garbage Collection) 機構があるということは、ほとんどの場合、メモリの割り当てと解放を気にする必要がないことを意味します。Python では、オブジェクトがいつ回収されるかは、その参照カウントによって決まります。参照カウントが 0 になると、そのオブジェクトは回収されます。しかし、交差参照やその他の理由により、オブジェクトの参照カウントが常に 0 にならない場合、そのオブジェクトは回収されず、メモリリークが発生します。したがって、通常の参照とは異なり、Python では弱参照という参照メカニズムが追加されました。弱参照の意味は、ある変数がオブジェクトを保持していても、そのオブジェクトの参照カウントを増やさないことです。したがって、weakref.ref(self, weakref_cb) は大多数の場合、self と等価です(ここでなぜ弱参照を使用するのかについては、別の章で説明します)。

さて、この部分のコードを見終わったので、_queue_management_worker がどのように実装されているか見てみましょう。


def _queue_management_worker(executor_reference,
                             processes,
                             pending_work_items,
                             work_ids_queue,
                             call_queue,
                             result_queue):
    """このプロセスとワーカープロセス間の通信を管理します。

    この関数はローカルスレッドで実行されます。

        executor_reference: このスレッドを所有する ProcessPoolExecutor への weakref.ref
    引数:
        process: このスレッドで使用される multiprocessing.Process インスタンスのリスト。
            このスレッドが終了できるかどうかを判断するために使用されます。
        pending_work_items: work ids を _WorkItems にマッピングする辞書 e.g.
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
        work_ids_queue: work ids の queue.Queue e.g. Queue([5, 6, ...])。
        call_queue: _WorkItems から派生した _CallItems で満たされる multiprocessing.Queue。
        result_queue: プロセスワーカーによって生成された _ResultItems の multiprocessing.Queue。
    """
    executor = None

    def shutting_down():
        return _shutdown or executor is None or executor._shutdown_thread

    def shutdown_worker():
        # これは上限です
        nb_children_alive = sum(p.is_alive() for p in processes.values())
        for i in range(0, nb_children_alive):
            call_queue.put_nowait(None)
        # キューのリソースをできるだけ早く解放します。
        call_queue.close()
        # 作成されたプロセスで .join() が呼ばれない場合、
        # 一部の multiprocessing.Queue メソッドが Mac OS X でデッドロックする可能性があります。
        for p in processes.values():
            p.join()

    reader = result_queue._reader

    while True:
        _add_call_item_to_queue(pending_work_items,
                                work_ids_queue,
                                call_queue)

        sentinels = [p.sentinel for p in processes.values()]
        assert sentinels
        ready = wait([reader] + sentinels)
        if reader in ready:
            result_item = reader.recv()
        else:
            # プロセスプールを壊れたとマークし、すぐに submits を失敗させます。
            executor = executor_reference()
            if executor is not None:
                executor._broken = True
                executor._shutdown_thread = True
                executor = None
            # すべてのフューチャーは失敗としてマークされなければなりません
            for work_id, work_item in pending_work_items.items():
                work_item.future.set_exception(
                    BrokenProcessPool(
                        "プロセスプール内のプロセスが "
                        "実行中または保留中に突然終了しました。"
                    ))
                # オブジェクトへの参照を削除します。See issue16284
                del work_item
            pending_work_items.clear()
            # 残りのワーカーを強制的に終了させます:キューまたはその
            # ロックが汚れた状態になっている可能性があり、永遠にブロックされる可能性があります。
            for p in processes.values():
                p.terminate()
            shutdown_worker()
            return
        if isinstance(result_item, int):
            # PID を使用してワーカーをクリーンにシャットダウンします
            # (エグゼキュータを壊れたとマークしない)
            assert shutting_down()
            p = processes.pop(result_item)
            p.join()
            if not processes:
                shutdown_worker()
                return
        elif result_item is not None:
            work_item = pending_work_items.pop(result_item.work_id, None)
            # work_item は他のプロセスが終了した場合 None になる可能性があります(上記参照)
            if work_item is not None:
                if result_item.exception:
                    work_item.future.set_exception(result_item.exception)
                else:
                    work_item.future.set_result(result_item.result)
                # オブジェクトへの参照を削除します。See issue16284
                del work_item
        # シャットダウンを開始するべきかどうかを確認します。
        executor = executor_reference()
        # 次の条件下では新しい作業アイテムを追加できません:
        #   - インタプリタがシャットダウン中であるか
        #   - このワーカーを所有するエグゼキュータが収集されたか
        #   - このワーカーを所有するエグゼキュータがシャットダウンされたか。
        if shutting_down():
            try:
                # 新しい作業アイテムを追加できないので、保留中の作業アイテムがない場合は
                # このスレッドをシャットダウンするのが安全です。
                if not pending_work_items:
                    shutdown_worker()
                    return
            except Full:
                # これは問題ではありません:最終的には目を覚まし(result_queue.get() で)
                # 再びセントinelを送信できるようになります。
                pass
        executor = None

おなじみの無限ループです。ループの最初のステップでは、_add_call_item_to_queue 関数を使用して、待機キュー内のタスクを呼び出しキューに追加します。この部分のコードを見てみましょう。

def _add_call_item_to_queue(pending_work_items,
                            work_ids,
                            call_queue):
    """pending_work_items から _WorkItems を使用して call_queue を満たします。

    この関数はブロックしません。

    引数:
        pending_work_items: work ids を _WorkItems にマッピングする辞書 e.g.
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
        work_ids: work ids の queue.Queue e.g. Queue([5, 6, ...])。Work ids
            は消費され、対応する _WorkItems が
            pending_work_items から _CallItems に変換され、call_queue に入れられます。
        call_queue: _WorkItems から派生した _CallItems で満たされる multiprocessing.Queue。
    """
    while True:
        if call_queue.full():
            return
        try:
            work_id = work_ids.get(block=False)
        except queue.Empty:
            return
        else:
            work_item = pending_work_items[work_id]

            if work_item.future.set_running_or_notify_cancel():
                call_queue.put(_CallItem(work_id,
                                         work_item.fn,
                                         work_item.args,
                                         work_item.kwargs),
                               block=True)
            else:
                del pending_work_items[work_id]
                continue

まず、呼び出しキューがすでに満杯かどうかを判断します。もし満杯であれば、このループを放棄します。次に、work_id キューから取得し、待機タスクから対応する _WorkItem インスタンスを取得します。次に、インスタンスにバインドされた Future インスタンスの set_running_or_notify_cancel メソッドを呼び出してタスクの状態を設定し、次にそれを呼び出しキューに投入します。


def set_running_or_notify_cancel(self):
    """Future を実行中としてマークするか、キャンセル通知を処理します。

    このメソッドは Executor 実装とユニットテストでのみ使用されるべきです。

    Future がキャンセルされた場合(cancel() が呼び出されて True を返した場合)、その後
    Future の完了を待っているスレッド(as_completed() または wait() の呼び出しを通じて)は通知され、
    False が返されます。

    Future がキャンセルされなかった場合、実行中の状態に置かれ(future の呼び出し
    running() は True を返します)、True が返されます。

    このメソッドは、Executor 実装がこの Future に関連する作業を実行する前に呼び出す必要があります。
    このメソッドが False を返す場合、その作業は実行されるべきではありません。

    戻り値:
        Future がキャンセルされた場合は False、そうでない場合は True を返します。

    例外:
        RuntimeError: このメソッドがすでに呼び出された場合、または set_result()
        または set_exception() が呼び出された場合。
    """
    with self._condition:
        if self._state == CANCELLED:
            self._state = CANCELLED_AND_NOTIFIED
            for waiter in self._waiters:
                waiter.add_cancelled(self)
            # self._condition.notify_all() は必要ありません。なぜなら
            # self.cancel() が通知をトリガーするからです。
            return False
        elif self._state == PENDING:
            self._state = RUNNING
            return True
        else:
            LOGGER.critical('Future %s in unexpected state: %s',
                            id(self),
                            self._state)
            raise RuntimeError('Future in unexpected state')

この部分の内容は非常にシンプルです。現在のインスタンスが待機状態であれば、True を返し、キャンセル状態であれば、False を返します。_add_call_item_to_queue 関数内では、すでに cancel 状態にある _WorkItem を待機タスクから削除します。

さて、私たちは _queue_management_worker 関数に戻ります。


def _queue_management_worker(executor_reference,
                             processes,
                             pending_work_items,
                             work_ids_queue,
                             call_queue,
                             result_queue):
    """このプロセスとワーカープロセス間の通信を管理します。

    この関数はローカルスレッドで実行されます。

        executor_reference: このスレッドを所有する ProcessPoolExecutor への weakref.ref
    引数:
        process: このスレッドで使用される multiprocessing.Process インスタンスのリスト。
            このスレッドが終了できるかどうかを判断するために使用されます。
        pending_work_items: work ids を _WorkItems にマッピングする辞書 e.g.
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
        work_ids_queue: work ids の queue.Queue e.g. Queue([5, 6, ...])。
        call_queue: _WorkItems から派生した _CallItems で満たされる multiprocessing.Queue。
        result_queue: プロセスワーカーによって生成された _ResultItems の multiprocessing.Queue。
    """
    executor = None

    def shutting_down():
        return _shutdown or executor is None or executor._shutdown_thread

    def shutdown_worker():
        # これは上限です
        nb_children_alive = sum(p.is_alive() for p in processes.values())
        for i in range(0, nb_children_alive):
            call_queue.put_nowait(None)
        # キューのリソースをできるだけ早く解放します。
        call_queue.close()
        # 作成されたプロセスで .join() が呼ばれない場合、
        # 一部の multiprocessing.Queue メソッドが Mac OS X でデッドロックする可能性があります。
        for p in processes.values():
            p.join()

    reader = result_queue._reader

    while True:
        _add_call_item_to_queue(pending_work_items,
                                work_ids_queue,
                                call_queue)

        sentinels = [p.sentinel for p in processes.values()]
        assert sentinels
        ready = wait([reader] + sentinels)
        if reader in ready:
            result_item = reader.recv()
        else:
            # プロセスプールを壊れたとマークし、すぐに submits を失敗させます。
            executor = executor_reference()
            if executor is not None:
                executor._broken = True
                executor._shutdown_thread = True
                executor = None
            # すべてのフューチャーは失敗としてマークされなければなりません
            for work_id, work_item in pending_work_items.items():
                work_item.future.set_exception(
                    BrokenProcessPool(
                        "プロセスプール内のプロセスが "
                        "実行中または保留中に突然終了しました。"
                    ))
                # オブジェクトへの参照を削除します。See issue16284
                del work_item
            pending_work_items.clear()
            # 残りのワーカーを強制的に終了させます:キューまたはその
            # ロックが汚れた状態になっている可能性があり、永遠にブロックされる可能性があります。
            for p in processes.values():
                p.terminate()
            shutdown_worker()
            return
        if isinstance(result_item, int):
            # PID を使用してワーカーをクリーンにシャットダウンします
            # (エグゼキュータを壊れたとマークしない)
            assert shutting_down()
            p = processes.pop(result_item)
            p.join()
            if not processes:
                shutdown_worker()
                return
        elif result_item is not None:
            work_item = pending_work_items.pop(result_item.work_id, None)
            # work_item は他のプロセスが終了した場合 None になる可能性があります(上記参照)
            if work_item is not None:
                if result_item.exception:
                    work_item.future.set_exception(result_item.exception)
                else:
                    work_item.future.set_result(result_item.result)
                # オブジェクトへの参照を削除します。See issue16284
                del work_item
        # シャットダウンを開始するべきかどうかを確認します。
        executor = executor_reference()
        # 次の条件下では新しい作業アイテムを追加できません:
        #   - インタプリタがシャットダウン中であるか
        #   - このワーカーを所有するエグゼキュータが収集されたか
        #   - このワーカーを所有するエグゼキュータがシャットダウンされたか。
        if shutting_down():
            try:
                # 新しい作業アイテムを追加できないので、保留中の作業アイテムがない場合は
                # このスレッドをシャットダウンするのが安全です。
                if not pending_work_items:
                    shutdown_worker()
                    return
            except Full:
                # これは問題ではありません:最終的には目を覚まし(result_queue.get() で)
                # 再びセントinelを送信できるようになります。
                pass
        executor = None

result_item 変数

私たちが見てみましょう。

まず、皆さんはここで少し疑問を持つかもしれません。


sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)

この wait は何でしょうか?reader は何でしょうか?一歩ずつ来ましょう。まず、前述のように、reader = result_queue._reader も皆さんを疑問に思わせるでしょう。ここで result_queuemultiprocessSimpleQueue です。彼女には _reader メソッドはありません。


class SimpleQueue(object):

    def __init__(self, *, ctx):
        self._reader, self._writer = connection.Pipe(duplex=False)
        self._rlock = ctx.Lock()
        self._poll = self._reader.poll
        if sys.platform == 'win32':
            self._wlock = None
        else:
            self._wlock = ctx.Lock()

上記のコードは SimpleQueue の一部です。私たちは SimpleQueue が本質的に Pipe を使用してプロセス間通信を行っていることを明確に理解できます。そして、_readerPipe を読み取るための変数です。

注意: 他のプロセス間通信の方法を復習してください。

さて、この部分を理解した後、wait メソッドを見てみましょう。


def wait(object_list, timeout=None):
    '''
    object_list のオブジェクトが準備完了/読み取り可能になるまで待機します。

    準備完了/読み取り可能な object_list 内のオブジェクトのリストを返します。
    '''
    with _WaitSelector() as selector:
        for obj in object_list:
            selector.register(obj, selectors.EVENT_READ)

        if timeout is not None:
            deadline = time.time() + timeout

        while True:
            ready = selector.select(timeout)
            if ready:
                return [key.fileobj for (key, events) in ready]
            else:
                if timeout is not None:
                    timeout = deadline - time.time()
                    if timeout < 0:
                        return ready

この部分のコードは非常にシンプルです。まず、私たちが読み取るべきオブジェクトを登録し、timeout が None の場合は、オブジェクトがデータを正常に読み取るまで待機し続けます。

さて、私たちは前述の _queue_management_worker 関数に戻ります。


        ready = wait([reader] + sentinels)
        if reader in ready:
            result_item = reader.recv()
        else:
            # プロセスプールを壊れたとマークし、すぐに submits を失敗させます。
            executor = executor_reference()
            if executor is not None:
                executor._broken = True
                executor._shutdown_thread = True
                executor = None
            # すべてのフューチャーは失敗としてマークされなければなりません
            for work_id, work_item in pending_work_items.items():
                work_item.future.set_exception(
                    BrokenProcessPool(
                        "プロセスプール内のプロセスが "
                        "実行中または保留中に突然終了しました。"
                    ))
                # オブジェクトへの参照を削除します。See issue16284
                del work_item
            pending_work_items.clear()
            # 残りのワーカーを強制的に終了させます:キューまたはその
            # ロックが汚れた状態になっている可能性があり、永遠にブロックされる可能性があります。
            for p in processes.values():
                p.terminate()
            shutdown_worker()
            return

私たちは wait 関数を使用して一連のオブジェクトを読み取ります。timeout を設定していないため、可読オブジェクトの結果を取得したときに、result_queue._reader がリストに含まれていない場合、処理プロセスが突然異常終了したことを意味します。このとき、私たちは後続の文を実行して現在のプロセスプールのシャットダウン操作を実行します。リストに含まれている場合、データを読み取り、result_item 変数を取得します。

私たちは次のコードを見てみましょう。


if isinstance(result_item, int):
    # PID を使用してワーカーをクリーンにシャットダウンします
    # (エグゼキュータを壊れたとマークしない)
    assert shutting_down()
    p = processes.pop(result_item)
    p.join()
    if not processes:
        shutdown_worker()
        return
elif result_item is not None:
    work_item = pending_work_items.pop(result_item.work_id, None)
    # work_item は他のプロセスが終了した場合 None になる可能性があります(上記参照)
    if work_item is not None:
        if result_item.exception:
            work_item.future.set_exception(result_item.exception)
        else:
            work_item.future.set_result(result_item.result)
        # オブジェクトへの参照を削除します。See issue16284
        del work_item

まず、result_item 変数が int 型である場合、皆さんは _process_worker 関数内に次のようなロジックがあったことを覚えているかもしれません。


call_item = call_queue.get(block=True)
if call_item is None:
    # キュー管理スレッドを起こす
    result_queue.put(os.getpid())
    return

呼び出しキューに新しいタスクがない場合、現在のプロセスの pid を結果キューに放入します。したがって、result_item の値が int であれば、以前のタスク処理が完了したことを意味し、私たちはクリーンアップを開始し、プロセスプールをシャットダウンします。

もし result_item が int でも None でもない場合、必然的に _ResultItem のインスタンスです。私たちは work_id に基づいて _WorkItem インスタンスを取得し、発生した例外または値を _WorkItem インスタンス内の Future インスタンス(つまり、私たちが submit した後に返されたもの)にバインドします。

最後に、この work_item を削除し、完了です。

最後に#

長々としたゴミのような文章を書いてしまいましたが、皆さんが気にしないことを願っています。実際、私たちは concurrent.future の実装を見ると、特に高深な黒魔法を使用しているわけではありませんが、その中の細部は一つ一つ味わう価値があります。したがって、この文章はここで終わりにします。後で機会があれば、concurrent.future の他の部分のコードも見てみましょう。味わうべき点がたくさんあります。

参考文献#

1.Python 3 multiprocessing

2.Python 3 weakref

3.並行プログラミングにおける Future モード

4.Python の並行プログラミングにおけるスレッドプール / プロセスプール

5.Future モードの詳細解説(並行使用)

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。