跳到内容

异步工作队列

AsyncWorkQueue

实现异步队列。

源代码在 bionemo/scdl/util/async_worker_queue.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class AsyncWorkQueue:
    """Implements an asynchronous queue."""

    def __init__(self, max_workers: int = 5, use_processes: bool = False) -> None:
        """Initialize the AsyncWorkQueue.

        Args:
            max_workers: The maximum number of worker threads or processes.
            use_processes: If True, use ProcessPoolExecutor; otherwise, use ThreadPoolExecutor.
        """
        self.use_processes = use_processes
        if use_processes:
            self.executor: Union[concurrent.futures.ThreadPoolExecutor, concurrent.futures.ProcessPoolExecutor] = (
                concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
            )
        else:
            self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        self.lock = threading.Lock()
        self.tasks: List[concurrent.futures.Future] = []

    def submit_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> concurrent.futures.Future:
        """Submit a task to the work queue.

        Args:
            func: The function to be executed asynchronously.
            args: Positional arguments to pass to the function.
            kwargs: Keyword arguments to pass to the function.
            A Future object representing the execution of the function.

        Returns:
            Future: placeholder for the asynchronous operation.
        """
        with self.lock:
            future = self.executor.submit(func, *args, **kwargs)
            self.tasks.append(future)
            return future

    def shutdown(self, wait: bool = True) -> None:
        """Shutdown the executor and wait for the tasks to complete.

        Args:
            wait: If True, wait for all tasks to complete before shutting down.
        """
        self.executor.shutdown(wait=wait)

    def get_completed_tasks(self) -> List[concurrent.futures.Future]:
        """Get the list of completed tasks.

        Returns:
            A list of Future objects that are completed.
        """
        with self.lock:
            completed_tasks = [task for task in self.tasks if task.done()]
            return completed_tasks

    def get_pending_tasks(self) -> List[concurrent.futures.Future]:
        """Get the list of pending tasks.

        Returns:
            A list of Future objects that are not yet completed.
        """
        with self.lock:
            pending_tasks = [task for task in self.tasks if not task.done()]
            return pending_tasks

    def get_task_results(self) -> List[Any]:
        """Get the results of all completed tasks.

        Returns:
            A list of results from the completed tasks.

        Raises:
            Exception: This would be expected if the task fails to complete or
            if is cancelled.
        """
        completed_tasks = self.get_completed_tasks()
        results = []
        for task in completed_tasks:
            try:
                results.append(task.result())
            except Exception as e:
                results.append(e)
        return results

    def wait(self) -> List[Any]:
        """Wait for all submitted tasks to complete and return their results.

        Returns:
            A list of results from all completed tasks.
        """
        # Wait for all tasks to complete
        concurrent.futures.wait(self.tasks)

        # Collect results from all tasks
        results = []
        for task in self.tasks:
            try:
                results.append(task.result())
            except Exception as e:
                results.append(e)

        return results

__init__(max_workers=5, use_processes=False)

初始化 AsyncWorkQueue。

参数

名称 类型 描述 默认值
max_workers int

工作线程或进程的最大数量。

5
use_processes bool

如果为 True,则使用 ProcessPoolExecutor;否则,使用 ThreadPoolExecutor。

False
源代码在 bionemo/scdl/util/async_worker_queue.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, max_workers: int = 5, use_processes: bool = False) -> None:
    """Initialize the AsyncWorkQueue.

    Args:
        max_workers: The maximum number of worker threads or processes.
        use_processes: If True, use ProcessPoolExecutor; otherwise, use ThreadPoolExecutor.
    """
    self.use_processes = use_processes
    if use_processes:
        self.executor: Union[concurrent.futures.ThreadPoolExecutor, concurrent.futures.ProcessPoolExecutor] = (
            concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
        )
    else:
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
    self.lock = threading.Lock()
    self.tasks: List[concurrent.futures.Future] = []

get_completed_tasks()

获取已完成任务的列表。

返回

类型 描述
List[Future]

已完成的 Future 对象列表。

源代码在 bionemo/scdl/util/async_worker_queue.py
69
70
71
72
73
74
75
76
77
def get_completed_tasks(self) -> List[concurrent.futures.Future]:
    """Get the list of completed tasks.

    Returns:
        A list of Future objects that are completed.
    """
    with self.lock:
        completed_tasks = [task for task in self.tasks if task.done()]
        return completed_tasks

get_pending_tasks()

获取待处理任务的列表。

返回

类型 描述
List[Future]

尚未完成的 Future 对象列表。

源代码在 bionemo/scdl/util/async_worker_queue.py
79
80
81
82
83
84
85
86
87
def get_pending_tasks(self) -> List[concurrent.futures.Future]:
    """Get the list of pending tasks.

    Returns:
        A list of Future objects that are not yet completed.
    """
    with self.lock:
        pending_tasks = [task for task in self.tasks if not task.done()]
        return pending_tasks

get_task_results()

获取所有已完成任务的结果。

返回

类型 描述
List[Any]

来自已完成任务的结果列表。

引发

类型 描述
Exception

如果任务未能完成,或者预期会发生这种情况

源代码在 bionemo/scdl/util/async_worker_queue.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def get_task_results(self) -> List[Any]:
    """Get the results of all completed tasks.

    Returns:
        A list of results from the completed tasks.

    Raises:
        Exception: This would be expected if the task fails to complete or
        if is cancelled.
    """
    completed_tasks = self.get_completed_tasks()
    results = []
    for task in completed_tasks:
        try:
            results.append(task.result())
        except Exception as e:
            results.append(e)
    return results

shutdown(wait=True)

关闭执行器并等待任务完成。

参数

名称 类型 描述 默认值
wait bool

如果为 True,则等待所有任务完成后再关闭。

True
源代码在 bionemo/scdl/util/async_worker_queue.py
61
62
63
64
65
66
67
def shutdown(self, wait: bool = True) -> None:
    """Shutdown the executor and wait for the tasks to complete.

    Args:
        wait: If True, wait for all tasks to complete before shutting down.
    """
    self.executor.shutdown(wait=wait)

submit_task(func, *args, **kwargs)

向工作队列提交任务。

参数

名称 类型 描述 默认值
func Callable[..., Any]

要异步执行的函数。

必需
args Any

传递给函数的位置参数。

()
kwargs Any

传递给函数的关键字参数。

{}

返回

名称 类型 描述
Future Future

异步操作的占位符。

源代码在 bionemo/scdl/util/async_worker_queue.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def submit_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> concurrent.futures.Future:
    """Submit a task to the work queue.

    Args:
        func: The function to be executed asynchronously.
        args: Positional arguments to pass to the function.
        kwargs: Keyword arguments to pass to the function.
        A Future object representing the execution of the function.

    Returns:
        Future: placeholder for the asynchronous operation.
    """
    with self.lock:
        future = self.executor.submit(func, *args, **kwargs)
        self.tasks.append(future)
        return future

wait()

等待所有提交的任务完成并返回其结果。

返回

类型 描述
List[Any]

来自所有已完成任务的结果列表。

源代码在 bionemo/scdl/util/async_worker_queue.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def wait(self) -> List[Any]:
    """Wait for all submitted tasks to complete and return their results.

    Returns:
        A list of results from all completed tasks.
    """
    # Wait for all tasks to complete
    concurrent.futures.wait(self.tasks)

    # Collect results from all tasks
    results = []
    for task in self.tasks:
        try:
            results.append(task.result())
        except Exception as e:
            results.append(e)

    return results