ThreadPoolExecutor
¶
ThreadPoolExecutor
是
Executor
子类,使用线程池异步执行调用。
死锁会发生当可调用关联
Future
waits on the results of another
Future
。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
和:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
-
class
concurrent.futures.
ThreadPoolExecutor
(
max_workers
=
None
,
thread_name_prefix
=
''
,
initializer
=
None
,
initargs
=
()
)
¶
An
Executor
subclass that uses a pool of at most
max_workers
threads to execute calls asynchronously.
All threads enqueued to
ThreadPoolExecutor
will be joined before the interpreter can exit. Note that the exit handler which does this is executed
before
any exit handlers added using
atexit
. This means exceptions in the main thread must be caught and handled in order to signal threads to exit gracefully. For this reason, it is recommended that
ThreadPoolExecutor
not be used for long-running tasks.
initializer
is an optional callable that is called at the start of each worker thread;
initargs
is a tuple of arguments passed to the initializer. Should
initializer
raise an exception, all currently pending jobs will raise a
BrokenThreadPool
, as well as any attempt to submit more jobs to the pool.
3.5 版改变:
若
max_workers
is
None
or not given, it will default to the number of processors on the machine, multiplied by
5
, assuming that
ThreadPoolExecutor
is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for
ProcessPoolExecutor
.
3.6 版改变:
添加
thread_name_prefix
parameter to allow users to control the
threading.Thread
names for worker threads created by the pool for easier debugging.
3.7 版改变:
添加
initializer
and
initargs
自变量。
3.8 版改变:
Default value of
max_workers
is changed to
min(32, os.cpu_count() + 4)
. This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.
ThreadPoolExecutor now reuses idle worker threads before starting
max_workers
worker threads too.
Changed in version 3.13:
Default value of
max_workers
is changed to
min(32, (os.process_cpu_count() or 1) + 4)
.
ThreadPoolExecutor 范例
¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistent-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor
¶
The
ProcessPoolExecutor
class is an
Executor
subclass that uses a pool of processes to execute calls asynchronously.
ProcessPoolExecutor
使用
multiprocessing
module, which allows it to side-step the
全局解释器锁
but also means that only picklable objects can be executed and returned.
The
__main__
module must be importable by worker subprocesses. This means that
ProcessPoolExecutor
will not work in the interactive interpreter.
调用
Executor
or
Future
methods from a callable submitted to a
ProcessPoolExecutor
will result in deadlock.
-
class
concurrent.futures.
ProcessPoolExecutor
(
max_workers
=
None
,
mp_context
=
None
,
initializer
=
None
,
initargs
=
()
,
max_tasks_per_child
=
None
)
¶
-
An
Executor
subclass that executes calls asynchronously using a pool of at most
max_workers
processes. If
max_workers
is
None
or not given, it will default to
os.process_cpu_count()
。若
max_workers
<=
0
, then a
ValueError
will be raised. On Windows,
max_workers
must be less than or equal to
61
. If it is not then
ValueError
会被引发。若
max_workers
is
None
, then the default chosen will be at most
61
, even if more processors are available.
mp_context
可以是
multiprocessing
context or
None
. It will be used to launch the workers. If
mp_context
is
None
or not given, the default
multiprocessing
context is used. See
上下文和启动方法
.
initializer
is an optional callable that is called at the start of each worker process;
initargs
is a tuple of arguments passed to the initializer. Should
initializer
raise an exception, all currently pending jobs will raise a
BrokenProcessPool
, as well as any attempt to submit more jobs to the pool.
max_tasks_per_child
is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be replaced with a fresh worker process. By default
max_tasks_per_child
is
None
which means worker processes will live as long as the pool. When a max is specified, the “spawn” multiprocessing start method will be used by default in absence of a
mp_context
parameter. This feature is incompatible with the “fork” start method.
3.3 版改变:
When one of the worker processes terminates abruptly, a
BrokenProcessPool
error is now raised. Previously, behaviour was undefined but operations on the executor or its futures would often freeze or deadlock.
3.7 版改变:
The
mp_context
argument was added to allow users to control the start_method for worker processes created by the pool.
添加
initializer
and
initargs
自变量。
注意
默认
multiprocessing
start method (see
上下文和启动方法
) will change away from
fork
in Python 3.14. Code that requires
fork
be used for their
ProcessPoolExecutor
should explicitly specify that by passing a
mp_context=multiprocessing.get_context("fork")
参数。
3.11 版改变:
The
max_tasks_per_child
argument was added to allow users to control the lifetime of workers in the pool.
3.12 版改变:
On POSIX systems, if your application has multiple threads and the
multiprocessing
context uses the
"fork"
start method: The
os.fork()
function called internally to spawn workers may raise a
DeprecationWarning
. Pass a
mp_context
configured to use a different start method. See the
os.fork()
documentation for further explanation.
Changed in version 3.13:
max_workers
使用
os.process_cpu_count()
by default, instead of
os.cpu_count()
.
ProcessPoolExecutor 范例
¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()