警告
If a process is killed using
Process.terminate()
or
os.kill()
while it is trying to use a
Queue
, then the data in the queue is likely to become corrupted. This may cause any other process to get an exception when it tries to use the queue later on.
警告
As mentioned above, if a child process has put items on a queue (and it has not used
JoinableQueue.cancel_join_thread
), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. See 编程指导方针 .
For an example of the usage of queues for interprocess communication see 范例 .
Returns a pair
(conn1, conn2)
of
Connection
objects representing the ends of a pipe.
若
duplex
is
True
(the default) then the pipe is bidirectional. If
duplex
is
False
then the pipe is unidirectional:
conn1
can only be used for receiving messages and
conn2
can only be used for sending messages.
Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.
The usual
queue.Empty
and
queue.Full
exceptions from the standard library’s
queue
module are raised to signal timeouts.
Queue
implements all the methods of
queue.Queue
except for
task_done()
and
join()
.
Return the approximate size of the queue. Because of multithreading/multiprocessing semantics, this number is not reliable.
Note that this may raise
NotImplementedError
on platforms like macOS where
sem_getvalue()
is not implemented.
返回
True
若队列为空,
False
otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.
返回
True
若队列是满的,
False
otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.
Put obj into the queue. If the optional argument
block
is
True
(the default) and
timeout
is
None
(默认),阻塞若有必要直到空闲槽可用。若
timeout
是正数,它阻塞最多
timeout
秒并引发
queue.Full
异常若在该时间内无可用空闲槽。否则 (
block
is
False
), put an item on the queue if a free slot is immediately available, else raise the
queue.Full
异常 (
timeout
被忽略在这种情况下)。
3.8 版改变:
If the queue is closed,
ValueError
is raised instead of
AssertionError
.
相当于
put(obj, False)
.
移除并返回项从队列。若可选自变量
block
is
True
(the default) and
timeout
is
None
(默认),阻塞若有必要直到项可用。若
timeout
是正数,它阻塞最多
timeout
秒并引发
queue.Empty
exception if no item was available within that time. Otherwise (block is
False
), return an item if one is immediately available, else raise the
queue.Empty
异常 (
timeout
被忽略在这种情况下)。
3.8 版改变:
If the queue is closed,
ValueError
is raised instead of
OSError
.
相当于
get(False)
.
multiprocessing.Queue
has a few additional methods not found in
queue.Queue
. These methods are usually unnecessary for most code:
Indicate that no more data will be put on this queue by the current process. The background thread will quit once it has flushed all buffered data to the pipe. This is called automatically when the queue is garbage collected.
Join the background thread. This can only be used after
close()
has been called. It blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.
By default if a process is not the creator of the queue then on exit it will attempt to join the queue’s background thread. The process can call
cancel_join_thread()
to make
join_thread()
do nothing.
Prevent
join_thread()
from blocking. In particular, this prevents the background thread from being joined automatically when the process exits – see
join_thread()
.
A better name for this method might be
allow_exit_without_flush()
. It is likely to cause enqueued data to be lost, and you almost certainly will not need to use it. It is really only there if you need the current process to exit immediately without waiting to flush enqueued data to the underlying pipe, and you don’t care about lost data.
注意
This class’s functionality requires a functioning shared semaphore implementation on the host operating system. Without one, the functionality in this class will be disabled, and attempts to instantiate a
Queue
will result in an
ImportError
。见
bpo-3770
for additional information. The same holds true for any of the specialized queue types listed below.
Close the queue: release internal resources.
A queue must not be used anymore after it is closed. For example,
get()
,
put()
and
empty()
methods must no longer be called.
Added in version 3.9.
返回
True
若队列为空,
False
否则。
从队列移除并返回项。
Put item 进队列。
JoinableQueue
,
Queue
subclass, is a queue which additionally has
task_done()
and
join()
方法。
Indicate that a formerly enqueued task is complete. Used by queue consumers. For each
get()
用于抓取任务,后续调用
task_done()
tells the queue that the processing on the task is complete.
若
join()
is currently blocking, it will resume when all items have been processed (meaning that a
task_done()
call was received for every item that had been
put()
into the queue).
引发
ValueError
if called more times than there were items placed in the queue.
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls
task_done()
to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero,
join()
unblocks.
返回当前进程所有存活子级的列表。
Calling this has the side effect of “joining” any processes which have already finished.
返回系统中的 CPU 数。
此数字不等效于当前进程可以使用的 CPU 数。可用 CPU 数可以获得采用
len(os.sched_getaffinity(0))
当 CPU 数无法确定时
NotImplementedError
被引发。
另请参阅
返回
Process
object corresponding to the current process.
An analogue of
threading.current_thread()
.
返回
Process
object corresponding to the parent process of the
current_process()
. For the main process,
parent_process
将是
None
.
Added in version 3.8.
Add support for when a program which uses
multiprocessing
has been frozen to produce a Windows executable. (Has been tested with
py2exe
,
PyInstaller
and
cx_Freeze
)。
One needs to call this function straight after the
if __name__ ==
'__main__'
line of the main module. For example:
from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
若
freeze_support()
line is omitted then trying to run the frozen executable will raise
RuntimeError
.
调用
freeze_support()
has no effect when invoked on any operating system other than Windows. In addition, if the module is being run normally by the Python interpreter on Windows (the program has not been frozen), then
freeze_support()
不起作用。
Returns a list of the supported start methods, the first of which is the default. The possible start methods are
'fork'
,
'spawn'
and
'forkserver'
. Not all platforms support all methods. See
上下文和启动方法
.
Added in version 3.4.
Return a context object which has the same attributes as the
multiprocessing
模块。
若
方法
is
None
then the default context is returned. Otherwise
方法
应该为
'fork'
,
'spawn'
,
'forkserver'
.
ValueError
is raised if the specified start method is not available. See
上下文和启动方法
.
Added in version 3.4.
Return the name of start method used for starting processes.
If the start method has not been fixed and
allow_none
is false, then the start method is fixed to the default and the name is returned. If the start method has not been fixed and
allow_none
为 True 则
None
被返回。
The return value can be
'fork'
,
'spawn'
,
'forkserver'
or
None
。见
上下文和启动方法
.
Added in version 3.4.
3.8 版改变: 在 macOS, spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess. See bpo-33725 .
Set the path of the Python interpreter to use when starting a child process. (By default
sys.executable
is used). Embedders will probably need to do some thing like
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
before they can create child processes.
3.4 版改变:
Now supported on POSIX when the
'spawn'
start method is used.
3.11 版改变: 接受 像路径对象 .
Set a list of module names for the forkserver main process to attempt to import so that their already imported state is inherited by forked processes. Any
ImportError
when doing so is silently ignored. This can be used as a performance enhancement to avoid repeated work in every process.
For this to work, it must be called before the forkserver process has been launched (before creating a
Pool
or starting a
Process
).
Only meaningful when using the
'forkserver'
start method. See
上下文和启动方法
.
Added in version 3.4.
Set the method which should be used to start child processes. The
方法
自变量可以是
'fork'
,
'spawn'
or
'forkserver'
。引发
RuntimeError
if the start method has already been set and
force
不是
True
。若
方法
is
None
and
force
is
True
then the start method is set to
None
。若
方法
is
None
and
force
is
False
then the context is set to the default context.
Note that this should be called at most once, and it should be protected inside the
if __name__ == '__main__'
clause of the main module.
见 上下文和启动方法 .
Added in version 3.4.
注意
multiprocessing
contains no analogues of
threading.active_count()
,
threading.enumerate()
,
threading.settrace()
,
threading.setprofile()
,
threading.Timer
,或
threading.local
.
Connection objects allow the sending and receiving of picklable objects or strings. They can be thought of as message oriented connected sockets.
通常创建 Connection 对象是使用
Pipe
– 另请参阅
Listener 和 Client
.
Send an object to the other end of the connection which should be read using
recv()
.
The object must be picklable. Very large pickles (approximately 32 MiB+, though it depends on the OS) may raise a
ValueError
异常。
Return an object sent from the other end of the connection using
send()
. Blocks until there is something to receive. Raises
EOFError
if there is nothing left to receive and the other end was closed.
Return the file descriptor or handle used by the connection.
关闭连接。
This is called automatically when the connection is garbage collected.
Return whether there is any data available to be read.
若
timeout
is not specified then it will return immediately. If
timeout
is a number then this specifies the maximum time in seconds to block. If
timeout
is
None
then an infinite timeout is used.
Note that multiple connection objects may be polled at once by using
multiprocessing.connection.wait()
.
发送字节数据从 像字节对象 作为完整消息。
若
offset
is given then data is read from that position in
buffer
。若
size
is given then that many bytes will be read from buffer. Very large buffers (approximately 32 MiB+, though it depends on the OS) may raise a
ValueError
exception
Return a complete message of byte data sent from the other end of the connection as a string. Blocks until there is something to receive. Raises
EOFError
if there is nothing left to receive and the other end has closed.
若
maxlength
is specified and the message is longer than
maxlength
then
OSError
is raised and the connection will no longer be readable.
Read into
buffer
a complete message of byte data sent from the other end of the connection and return the number of bytes in the message. Blocks until there is something to receive. Raises
EOFError
if there is nothing left to receive and the other end was closed.
buffer 必须是可写 像字节对象 。若 offset is given then the message will be written into the buffer from that position. Offset must be a non-negative integer less than the length of buffer (in bytes).
If the buffer is too short then a
BufferTooShort
exception is raised and the complete message is available as
e.args[0]
where
e
is the exception instance.
3.3 版改变:
Connection objects themselves can now be transferred between processes using
Connection.send()
and
Connection.recv()
.
Connection objects also now support the context management protocol – see
上下文管理器类型
.
__enter__()
returns the connection object, and
__exit__()
调用
close()
.
例如:
>>> from multiprocessing import Pipe >>> a, b = Pipe() >>> a.send([1, 'hello', None]) >>> b.recv() [1, 'hello', None] >>> b.send_bytes(b'thank you') >>> a.recv_bytes() b'thank you' >>> import array >>> arr1 = array.array('i', range(5)) >>> arr2 = array.array('i', [0] * 10) >>> a.send_bytes(arr1) >>> count = b.recv_bytes_into(arr2) >>> assert count == len(arr1) * arr1.itemsize >>> arr2 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
The
Connection.recv()
method automatically unpickles the data it receives, which can be a security risk unless you can trust the process which sent the message.
Therefore, unless the connection object was produced using
Pipe()
you should only use the
recv()
and
send()
methods after performing some sort of authentication. See
身份验证键
.
警告
If a process is killed while it is trying to read or write to a pipe then the data in the pipe is likely to become corrupted, because it may become impossible to be sure where the message boundaries lie.
Generally synchronization primitives are not as necessary in a multiprocess program as they are in a multithreaded program. See the documentation for
threading
模块。
Note that one can also create synchronization primitives by using a manager object – see 管理器 .
屏障对象:克隆自
threading.Barrier
.
Added in version 3.3.
A bounded semaphore object: a close analog of
threading.BoundedSemaphore
.
A solitary difference from its close analog exists: its
acquire
method’s first argument is named
block
, as is consistent with
Lock.acquire()
.
注意
On macOS, this is indistinguishable from
Semaphore
因为
sem_getvalue()
is not implemented on that platform.
条件变量:别名化的
threading.Condition
.
若
lock
is specified then it should be a
Lock
or
RLock
对象从
multiprocessing
.
3.3 版改变:
The
wait_for()
方法被添加。
克隆自
threading.Event
.
A non-recursive lock object: a close analog of
threading.Lock
. Once a process or thread has acquired a lock, subsequent attempts to acquire it from any process or thread will block until it is released; any process or thread may release it. The concepts and behaviors of
threading.Lock
as it applies to threads are replicated here in
multiprocessing.Lock
as it applies to either processes or threads, except as noted.
注意,
Lock
is actually a factory function which returns an instance of
multiprocessing.synchronize.Lock
initialized with a default context.
Lock
支持
上下文管理器
protocol and thus may be used in
with
语句。
获得锁,阻塞或非阻塞。
With the
block
自变量设为
True
(the default), the method call will block until the lock is in an unlocked state, then set it to locked and return
True
. Note that the name of this first argument differs from that in
threading.Lock.acquire()
.
With the
block
自变量设为
False
, the method call does not block. If the lock is currently in a locked state, return
False
; otherwise set the lock to a locked state and return
True
.
When invoked with a positive, floating-point value for
timeout
, block for at most the number of seconds specified by
timeout
as long as the lock can not be acquired. Invocations with a negative value for
timeout
are equivalent to a
timeout
of zero. Invocations with a
timeout
value of
None
(the default) set the timeout period to infinite. Note that the treatment of negative or
None
values for
timeout
differs from the implemented behavior in
threading.Lock.acquire()
。
timeout
argument has no practical implications if the
block
argument is set to
False
and is thus ignored. Returns
True
if the lock has been acquired or
False
if the timeout period has elapsed.
Release a lock. This can be called from any process or thread, not only the process or thread which originally acquired the lock.
Behavior is the same as in
threading.Lock.release()
except that when invoked on an unlocked lock, a
ValueError
被引发。
A recursive lock object: a close analog of
threading.RLock
. A recursive lock must be released by the process or thread that acquired it. Once a process or thread has acquired a recursive lock, the same process or thread may acquire it again without blocking; that process or thread must release it once for each time it has been acquired.
注意,
RLock
is actually a factory function which returns an instance of
multiprocessing.synchronize.RLock
initialized with a default context.
RLock
支持
上下文管理器
protocol and thus may be used in
with
语句。
获得锁,阻塞或非阻塞。
当援引采用
block
自变量设为
True
, block until the lock is in an unlocked state (not owned by any process or thread) unless the lock is already owned by the current process or thread. The current process or thread then takes ownership of the lock (if it does not already have ownership) and the recursion level inside the lock increments by one, resulting in a return value of
True
. Note that there are several differences in this first argument’s behavior compared to the implementation of
threading.RLock.acquire()
, starting with the name of the argument itself.
当援引采用
block
自变量设为
False
, do not block. If the lock has already been acquired (and thus is owned) by another process or thread, the current process or thread does not take ownership and the recursion level within the lock is not changed, resulting in a return value of
False
. If the lock is in an unlocked state, the current process or thread takes ownership and the recursion level is incremented, resulting in a return value of
True
.
Use and behaviors of the
timeout
argument are the same as in
Lock.acquire()
. Note that some of these behaviors of
timeout
differ from the implemented behaviors in
threading.RLock.acquire()
.
Release a lock, decrementing the recursion level. If after the decrement the recursion level is zero, reset the lock to unlocked (not owned by any process or thread) and if any other processes or threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling process or thread.
Only call this method when the calling process or thread owns the lock. An
AssertionError
is raised if this method is called by a process or thread other than the owner or if the lock is in an unlocked (unowned) state. Note that the type of exception raised in this situation differs from the implemented behavior in
threading.RLock.release()
.
A semaphore object: a close analog of
threading.Semaphore
.
A solitary difference from its close analog exists: its
acquire
method’s first argument is named
block
, as is consistent with
Lock.acquire()
.
注意
在 macOS,
sem_timedwait
不被支持,所以调用
acquire()
with a timeout will emulate that function’s behavior using a sleeping loop.
注意
If the SIGINT signal generated by
Ctrl
-
C
arrives while the main thread is blocked by a call to
BoundedSemaphore.acquire()
,
Lock.acquire()
,
RLock.acquire()
,
Semaphore.acquire()
,
Condition.acquire()
or
Condition.wait()
then the call will be immediately interrupted and
KeyboardInterrupt
会被引发。
This differs from the behaviour of
threading
where SIGINT will be ignored while the equivalent blocking calls are in progress.
注意
Some of this package’s functionality requires a functioning shared semaphore implementation on the host operating system. Without one, the
multiprocessing.synchronize
module will be disabled, and attempts to import it will result in an
ImportError
。见
bpo-3770
for additional information.
ctypes
对象
¶
It is possible to create shared objects using shared memory which can be inherited by child processes.
返回
ctypes
object allocated from shared memory. By default the return value is actually a synchronized wrapper for the object. The object itself can be accessed via the
value
attribute of a
Value
.
typecode_or_type
determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the
array
模块。
*args
is passed on to the constructor for the type.
若
lock
is
True
(the default) then a new recursive lock object is created to synchronize access to the value. If
lock
是
Lock
or
RLock
object then that will be used to synchronize access to the value. If
lock
is
False
then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.
Operations like
+=
which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do
counter.value += 1
Assuming the associated lock is recursive (which it is by default) you can instead do
with counter.get_lock(): counter.value += 1
注意, lock 是仅关键词自变量。
Return a ctypes array allocated from shared memory. By default the return value is actually a synchronized wrapper for the array.
typecode_or_type
determines the type of the elements of the returned array: it is either a ctypes type or a one character typecode of the kind used by the
array
module. If
size_or_initializer
is an integer, then it determines the length of the array, and the array will be initially zeroed. Otherwise,
size_or_initializer
is a sequence which is used to initialize the array and whose length determines the length of the array.
若
lock
is
True
(the default) then a new lock object is created to synchronize access to the value. If
lock
是
Lock
or
RLock
object then that will be used to synchronize access to the value. If
lock
is
False
then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.
注意, lock 是仅关键词自变量。
Note that an array of
ctypes.c_char
has
value
and
raw
attributes which allow one to use it to store and retrieve strings.
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects . Other processes can access the shared objects by using proxies.
返回已启动的
SyncManager
object which can be used for sharing objects between processes. The returned manager object corresponds to a spawned child process and has methods which will create shared objects and return corresponding proxies.
Manager processes will be shutdown as soon as they are garbage collected or their parent process exits. The manager classes are defined in the
multiprocessing.managers
模块:
创建 BaseManager 对象。
一旦创建一个应该调用
start()
or
get_server().serve_forever()
确保管理器对象引用已启动的管理器进程。
address
是管理器进程监听新连接的地址。若
address
is
None
那么选取任意一个。
authkey
is the authentication key which will be used to check the validity of incoming connections to the server process. If
authkey
is
None
then
current_process().authkey
is used. Otherwise
authkey
is used and it must be a byte string.
serializer
必须为
'pickle'
(使用
pickle
serialization) or
'xmlrpclib'
(使用
xmlrpc.client
serialization).
ctx
is a context object, or
None
(use the current context). See the
get_context()
函数。
shutdown_timeout
is a timeout in seconds used to wait until the process used by the manager completes in the
shutdown()
method. If the shutdown times out, the process is terminated. If terminating the process also times out, the process is killed.
3.11 版改变: 添加 shutdown_timeout 参数。
启动子进程以启动管理器。若
initializer
不是
None
那么子进程将调用
initializer(*initargs)
当它开始时。
返回
Server
object which represents the actual server under the control of the Manager. The
Server
对象支持
serve_forever()
方法:
>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
此外,还拥有
address
属性。
连接本地管理器对象到远程管理器进程:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
A classmethod which can be used for registering a type or callable with the manager class.
typeid is a “type identifier” which is used to identify a particular type of shared object. This must be a string.
callable
is a callable used for creating objects for this type identifier. If a manager instance will be connected to the server using the
connect()
method, or if the
create_method
自变量为
False
then this can be left as
None
.
proxytype
是子类化的
BaseProxy
which is used to create proxies for shared objects with this
typeid
。若
None
then a proxy class is created automatically.
exposed
is used to specify a sequence of method names which proxies for this typeid should be allowed to access using
BaseProxy._callmethod()
. (If
exposed
is
None
then
proxytype._exposed_
is used instead if it exists.) In the case where no exposed list is specified, all “public methods” of the shared object will be accessible. (Here a “public method” means any attribute which has a
__call__()
method and whose name does not begin with
'_'
)。
method_to_typeid
is a mapping used to specify the return type of those exposed methods which should return a proxy. It maps method names to typeid strings. (If
method_to_typeid
is
None
then
proxytype._method_to_typeid_
is used instead if it exists.) If a method’s name is not a key of this mapping or if the mapping is
None
then the object returned by the method will be copied by value.
create_method
determines whether a method should be created with name
typeid
which can be used to tell the server process to create a new shared object and return a proxy for it. By default it is
True
.
BaseManager
实例还有 1 只读特性:
用于管理器的地址。
3.3 版改变:
管理器对象支持上下文管理协议 – 见
上下文管理器类型
.
__enter__()
starts the server process (if it has not already started) and then returns the manager object.
__exit__()
调用
shutdown()
.
In previous versions
__enter__()
did not start the manager’s server process if it was not already started.
子类化的
BaseManager
which can be used for the synchronization of processes. Objects of this type are returned by
multiprocessing.Manager()
.
Its methods create and return 代理对象 for a number of commonly used data types to be synchronized across processes. This notably includes shared lists and dictionaries.
创建共享
threading.Barrier
对象并返回它的代理。
Added in version 3.3.
创建共享
threading.BoundedSemaphore
对象并返回它的代理。
创建共享
threading.Condition
对象并返回它的代理。
若
lock
被供给,则它应该是代理对于
threading.Lock
or
threading.RLock
对象。
3.3 版改变:
The
wait_for()
方法被添加。
创建共享
threading.Event
对象并返回它的代理。
创建共享
threading.Lock
对象并返回它的代理。
创建共享
queue.Queue
对象并返回它的代理。
创建共享
threading.RLock
对象并返回它的代理。
创建共享
threading.Semaphore
对象并返回它的代理。
创建数组并返回其代理。
创建对象具有可写
value
属性并为它返回代理。
3.6 版改变:
Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the
SyncManager
.
A type that can register with
SyncManager
.
A namespace object has no public methods, but does have writable attributes. Its representation shows the values of its attributes.
However, when using a proxy for a namespace object, an attribute beginning with
'_'
will be an attribute of the proxy and not an attribute of the referent:
>>> mp_context = multiprocessing.get_context('spawn') >>> manager = mp_context.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
To create one’s own manager, one creates a subclass of
BaseManager
and uses the
register()
classmethod to register new types or callables with the manager class. For example:
from multiprocessing.managers import BaseManager class MathsClass: def add(self, x, y): return x + y def mul(self, x, y): return x * y class MyManager(BaseManager): pass MyManager.register('Maths', MathsClass) if __name__ == '__main__': with MyManager() as manager: maths = manager.Maths() print(maths.add(4, 3)) # prints 7 print(maths.mul(7, 8)) # prints 56
It is possible to run a manager server on one machine and have clients use it from other machines (assuming that the firewalls involved allow it).
Running the following commands creates a server for a single shared queue which remote clients can access:
>>> from multiprocessing.managers import BaseManager >>> from queue import Queue >>> queue = Queue() >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue', callable=lambda:queue) >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra') >>> s = m.get_server() >>> s.serve_forever()
One client can access the server as follows:
>>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue') >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') >>> m.connect() >>> queue = m.get_queue() >>> queue.put('hello')
Another client can also use it:
>>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue') >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') >>> m.connect() >>> queue = m.get_queue() >>> queue.get() 'hello'
Local processes can also access that queue, using the code from above on the client to access it remotely:
>>> from multiprocessing import Process, Queue >>> from multiprocessing.managers import BaseManager >>> class Worker(Process): ... def __init__(self, q): ... self.q = q ... super().__init__() ... def run(self): ... self.q.put('local hello') ... >>> queue = Queue() >>> w = Worker(queue) >>> w.start() >>> class QueueManager(BaseManager): pass ... >>> QueueManager.register('get_queue', callable=lambda: queue) >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra') >>> s = m.get_server() >>> s.serve_forever()
A proxy is an object which refers to a shared object which lives (presumably) in a different process. The shared object is said to be the referent of the proxy. Multiple proxy objects may have the same referent.
A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy). In this way, a proxy can be used just like its referent can:
>>> mp_context = multiprocessing.get_context('spawn') >>> manager = mp_context.Manager() >>> l = manager.list([i*i for i in range(10)]) >>> print(l) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> print(repr(l)) <ListProxy object, typeid 'list' at 0x...> >>> l[4] 16 >>> l[2:5] [4, 9, 16]
Notice that applying
str()
to a proxy will return the representation of the referent, whereas applying
repr()
will return the representation of the proxy.
An important feature of proxy objects is that they are picklable so they can be passed between processes. As such, a referent can contain 代理对象 . This permits nesting of these managed lists, dicts, and other 代理对象 :
>>> a = manager.list() >>> b = manager.list() >>> a.append(b) # referent of a now contains referent of b >>> print(a, b) [<ListProxy object, typeid 'list' at ...>] [] >>> b.append('hello') >>> print(a[0], b) ['hello'] ['hello']
Similarly, dict and list proxies may be nested inside one another:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ]) >>> d_first_inner = l_outer[0] >>> d_first_inner['a'] = 1 >>> d_first_inner['b'] = 2 >>> l_outer[1]['c'] = 3 >>> l_outer[1]['z'] = 26 >>> print(l_outer[0]) {'a': 1, 'b': 2} >>> print(l_outer[1]) {'c': 3, 'z': 26}
If standard (non-proxy)
list
or
dict
objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a
__setitem__
on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy:
# create a list proxy and append a mutable object (a dictionary) lproxy = manager.list() lproxy.append({}) # now mutate the dictionary d = lproxy[0] d['a'] = 1 d['b'] = 2 # at this point, the changes to d are not yet synced, but by # updating the dictionary, the proxy is notified of the change lproxy[0] = d
This approach is perhaps less convenient than employing nested 代理对象 for most use cases but also demonstrates a level of control over the synchronization.
注意
The proxy types in
multiprocessing
do nothing to support comparisons by value. So, for instance, we have:
>>> manager.list([1,2,3]) == [1,2,3] False
One should just use a copy of the referent instead when making comparisons.
Proxy objects are instances of subclasses of
BaseProxy
.
Call and return the result of a method of the proxy’s referent.
若
proxy
is a proxy whose referent is
obj
then the expression
proxy._callmethod(methodname, args, kwds)
will evaluate the expression
getattr(obj, methodname)(*args, **kwds)
in the manager’s process.
The returned value will be a copy of the result of the call or a proxy to a new shared object – see documentation for the
method_to_typeid
自变量
BaseManager.register()
.
If an exception is raised by the call, then is re-raised by
_callmethod()
. If some other exception is raised in the manager’s process then this is converted into a
RemoteError
exception and is raised by
_callmethod()
.
Note in particular that an exception will be raised if methodname has not been exposed .
An example of the usage of
_callmethod()
:
>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
Return a copy of the referent.
If the referent is unpicklable then this will raise an exception.
返回代理对象的表示。
返回引用的表示。
A proxy object uses a weakref callback so that when it gets garbage collected it deregisters itself from the manager which owns its referent.
A shared object gets deleted from the manager process when there are no longer any proxies referring to it.
可以创建用于履行提交给它的任务的一个进程池,采用
Pool
类。
进程池对象,可以控制向其提交作业的工作者进程池。它支持带有超时和回调的异步结果,并拥有并行映射实现。
processes
是要使用的工作进程数。若
processes
is
None
那么,返回数通过
os.cpu_count()
被使用。
若
initializer
不是
None
那么,各工作者进程将调用
initializer(*initargs)
当它开始时。
maxtasksperchild
是工作进程在退出之前可以完成的任务数,然后以刷新工作者进程替换,释放未使用的资源。默认
maxtasksperchild
is
None
,这意味着工作者进程将活得与池一样长。
context
可用于指定为启动工作者进程的上下文。通常,创建池是使用函数
multiprocessing.Pool()
或
Pool()
方法为上下文对象。在这两种情况下,
context
可适当设置。
注意:池对象的方法仅应由创建池的进程调用。
警告
multiprocessing.pool
objects have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by calling
close()
and
terminate()
manually. Failure to do this can lead to the process hanging on finalization.
Note that it is
not correct
to rely on the garbage collector to destroy the pool as CPython does not assure that the finalizer of the pool will be called (see
object.__del__()
了解更多信息)。
3.2 版改变: 添加 maxtasksperchild 参数。
3.4 版改变: 添加 context 参数。
注意
工作者进程在
Pool
typically live for the complete duration of the Pool’s work queue. A frequent pattern found in other systems (such as Apache, mod_wsgi, etc) to free resources held by workers is to allow a worker within a pool to complete only a set amount of work before being exiting, being cleaned up and a new process spawned to replace the old one. The
maxtasksperchild
自变量到
Pool
exposes this ability to the end user.
调用
func
采用自变量
args
和关键词自变量
kwds
。它会阻塞,直到结果就绪为止。提供这种阻塞,
apply_async()
更适合并行履行工作。此外,
func
是池的唯一执行工作者。
变体
apply()
方法返回
AsyncResult
对象。
若 callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.
若 error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.
Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.
并行等效于
map()
built-in function (it supports only one
iterable
argument though, for multiple iterables see
starmap()
). It blocks until the result is ready.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
Note that it may cause high memory usage for very long iterables. Consider using
imap()
or
imap_unordered()
with explicit
chunksize
option for better efficiency.
变体
map()
方法返回
AsyncResult
对象。
若 callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.
若 error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.
Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.
惰性版本的
map()
.
The
chunksize
argument is the same as the one used by the
map()
method. For very long iterables using a large value for
chunksize
can make the job complete
much
faster than using the default value of
1
.
Also if
chunksize
is
1
那么
next()
method of the iterator returned by the
imap()
method has an optional
timeout
parameter:
next(timeout)
会引发
multiprocessing.TimeoutError
if the result cannot be returned within
timeout
秒。
如同
imap()
except that the ordering of the results from the returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be “correct”.)
像
map()
except that the elements of the
iterable
are expected to be iterables that are unpacked as arguments.
Hence an
iterable
of
[(1,2), (3, 4)]
产生
[func(1,2),
func(3,4)]
.
Added in version 3.3.
A combination of
starmap()
and
map_async()
that iterates over
iterable
of iterables and calls
func
with the iterables unpacked. Returns a result object.
Added in version 3.3.
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected
terminate()
will be called immediately.
等待工作者进程退出。必须调用
close()
or
terminate()
before using
join()
.
3.3 版改变:
Pool objects now support the context management protocol – see
上下文管理器类型
.
__enter__()
returns the pool object, and
__exit__()
调用
terminate()
.
The class of the result returned by
Pool.apply_async()
and
Pool.map_async()
.
返回结果当它到达时。若
timeout
不是
None
and the result does not arrive within
timeout
seconds then
multiprocessing.TimeoutError
is raised. If the remote call raised an exception then that exception will be reraised by
get()
.
等待直到结果可用或直到 timeout 秒过去。
返回调用是否已完成。
Return whether the call completed without raising an exception. Will raise
ValueError
if the result is not ready.
3.7 版改变:
If the result is not ready,
ValueError
is raised instead of
AssertionError
.
下列范例演示池的使用:
from multiprocessing import Pool import time def f(x): return x*x if __name__ == '__main__': with Pool(processes=4) as pool: # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Usually message passing between processes is done using queues or by using
Connection
objects returned by
Pipe()
.
不管怎样,
multiprocessing.connection
module allows some extra flexibility. It basically gives a high level message oriented API for dealing with sockets or Windows named pipes. It also has support for
digest authentication
使用
hmac
module, and for polling multiple connections at the same time.
Send a randomly generated message to the other end of the connection and wait for a reply.
If the reply matches the digest of the message using
authkey
as the key then a welcome message is sent to the other end of the connection. Otherwise
AuthenticationError
被引发。
Receive a message, calculate the digest of the message using authkey as the key, and then send the digest back.
If a welcome message is not received, then
AuthenticationError
被引发。
Attempt to set up a connection to the listener which is using address
address
,返回
Connection
.
The type of the connection is determined by 系列 argument, but this can generally be omitted since it can usually be inferred from the format of address 。(见 地址格式 )
若
authkey
有给定且不是
None
, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if
authkey
is
None
.
AuthenticationError
is raised if authentication fails. See
身份验证键
.
A wrapper for a bound socket or Windows named pipe which is ‘listening’ for connections.
address is the address to be used by the bound socket or named pipe of the listener object.
注意
If an address of ‘0.0.0.0’ is used, the address will not be a connectable end point on Windows. If you require a connectable end-point, you should use ‘127.0.0.1’.
系列
is the type of socket (or named pipe) to use. This can be one of the strings
'AF_INET'
(for a TCP socket),
'AF_UNIX'
(for a Unix domain socket) or
'AF_PIPE'
(for a Windows named pipe). Of these only the first is guaranteed to be available. If
系列
is
None
then the family is inferred from the format of
address
。若
address
is also
None
then a default is chosen. This default is the family which is assumed to be the fastest available. See
地址格式
。注意,若
系列
is
'AF_UNIX'
and address is
None
then the socket will be created in a private temporary directory created using
tempfile.mkstemp()
.
If the listener object uses a socket then
backlog
(1 by default) is passed to the
listen()
method of the socket once it has been bound.
若
authkey
有给定且不是
None
, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if
authkey
is
None
.
AuthenticationError
is raised if authentication fails. See
身份验证键
.
Accept a connection on the bound socket or named pipe of the listener object and return a
Connection
object. If authentication is attempted and fails, then
AuthenticationError
被引发。
Close the bound socket or named pipe of the listener object. This is called automatically when the listener is garbage collected. However it is advisable to call it explicitly.
Listener 对象拥有以下只读特性:
Listener 对象正在使用的地址。
The address from which the last accepted connection came. If this is unavailable then it is
None
.
3.3 版改变:
Listener objects now support the context management protocol – see
上下文管理器类型
.
__enter__()
returns the listener object, and
__exit__()
调用
close()
.
Wait till an object in
object_list
is ready. Returns the list of those objects in
object_list
which are ready. If
timeout
is a float then the call blocks for at most that many seconds. If
timeout
is
None
then it will block for an unlimited period. A negative timeout is equivalent to a zero timeout.
For both POSIX and Windows, an object can appear in object_list if it is
a readable
Connection
object;
a connected and readable
socket.socket
object; or
A connection or socket object is ready when there is data available to be read from it, or the other end has been closed.
POSIX
:
wait(object_list, timeout)
almost equivalent
select.select(object_list, [], [], timeout)
. The difference is that, if
select.select()
is interrupted by a signal, it can raise
OSError
with an error number of
EINTR
,而
wait()
will not.
Windows
: An item in
object_list
must either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 function
WaitForMultipleObjects()
) or it can be an object with a
fileno()
method which returns a socket handle or pipe handle. (Note that pipe handles and socket handles are
not
waitable handles.)
Added in version 3.3.
范例
The following server code creates a listener which uses
'secret password'
as an authentication key. It then waits for a connection and sends some data to the client:
from multiprocessing.connection import Listener from array import array address = ('localhost', 6000) # family is deduced to be 'AF_INET' with Listener(address, authkey=b'secret password') as listener: with listener.accept() as conn: print('connection accepted from', listener.last_accepted) conn.send([2.25, None, 'junk', float]) conn.send_bytes(b'hello') conn.send_bytes(array('i', [42, 1729]))
The following code connects to the server and receives some data from the server:
from multiprocessing.connection import Client from array import array address = ('localhost', 6000) with Client(address, authkey=b'secret password') as conn: print(conn.recv()) # => [2.25, None, 'junk', float] print(conn.recv_bytes()) # => 'hello' arr = array('i', [0, 0, 0, 0, 0]) print(conn.recv_bytes_into(arr)) # => 8 print(arr) # => array('i', [42, 1729, 0, 0, 0])
The following code uses
wait()
to wait for messages from multiple processes at once:
import time, random from multiprocessing import Process, Pipe, current_process from multiprocessing.connection import wait def foo(w): for i in range(10): w.send((i, current_process().name)) w.close() if __name__ == '__main__': readers = [] for i in range(4): r, w = Pipe(duplex=False) readers.append(r) p = Process(target=foo, args=(w,)) p.start() # We close the writable end of the pipe now to be sure that # p is the only process which owns a handle for it. This # ensures that when p closes its handle for the writable end, # wait() will promptly report the readable end as being ready. w.close() while readers: for r in wait(readers): try: msg = r.recv() except EOFError: readers.remove(r) else: print(msg)
An
'AF_INET'
地址是元组采用形式
(hostname, port)
where
hostname
是字符串和
port
是整数。
An
'AF_UNIX'
地址是表示文件系统中文件名的字符串。
An
'AF_PIPE'
地址是字符串采用形式
r'\\.\pipe\PipeName'
。要使用
Client()
连接到远程计算机的命名管道称为
ServerName
,应使用地址形式
r'\\ServerName\pipe\PipeName'
代替。
注意:默认情况下,任何以 2 反斜杠开头的字符串均假定为
'AF_PIPE'
地址而不是
'AF_UNIX'
地址。
When one uses
Connection.recv
, the data received is automatically unpickled. Unfortunately unpickling data from an untrusted source is a security risk. Therefore
Listener
and
Client()
使用
hmac
module to provide digest authentication.
An authentication key is a byte string which can be thought of as a password: once a connection is established both ends will demand proof that the other knows the authentication key. (Demonstrating that both ends are using the same key does not involve sending the key over the connection.)
If authentication is requested but no authentication key is specified then the return value of
current_process().authkey
被使用 (见
Process
). This value will be automatically inherited by any
Process
object that the current process creates. This means that (by default) all processes of a multi-process program will share a single authentication key which can be used when setting up connections between themselves.
Suitable authentication keys can also be generated by using
os.urandom()
.
Some support for logging is available. Note, however, that the
logging
package does not use process shared locks so it is possible (depending on the handler type) for messages from different processes to get mixed up.
Returns the logger used by
multiprocessing
. If necessary, a new one will be created.
When first created the logger has level
logging.NOTSET
and no default handler. Messages sent to this logger will not by default propagate to the root logger.
Note that on Windows child processes will only inherit the level of the parent process’s logger – any other customization of the logger will not be inherited.
This function performs a call to
get_logger()
but in addition to returning the logger created by get_logger, it adds a handler which sends output to
sys.stderr
using format
'[%(levelname)s/%(processName)s] %(message)s'
. You can modify
levelname
of the logger by passing a
level
自变量。
Below is an example session with logging turned on:
>>> import multiprocessing, logging >>> logger = multiprocessing.log_to_stderr() >>> logger.setLevel(logging.INFO) >>> logger.warning('doomed') [WARNING/MainProcess] doomed >>> m = multiprocessing.Manager() [INFO/SyncManager-...] child process calling self.run() [INFO/SyncManager-...] created temp directory /.../pymp-... [INFO/SyncManager-...] manager serving at '/.../listener-...' >>> del m [INFO/MainProcess] sending shutdown message to manager [INFO/SyncManager-...] manager exiting with exitcode 0
For a full table of logging levels, see the
logging
模块。
multiprocessing.dummy
模块
¶
multiprocessing.dummy
复现的 API 源自
multiprocessing
但不超过包裹器围绕
threading
模块。
In particular, the
Pool
function provided by
multiprocessing.dummy
返回实例化的
ThreadPool
,这是子类对于
Pool
that supports all the same method calls but uses a pool of worker threads rather than worker processes.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
ThreadPool
instances are fully interface compatible with
Pool
instances, and their resources must also be properly managed, either by using the pool as a context manager or by calling
close()
and
terminate()
manually.
processes
is the number of worker threads to use. If
processes
is
None
那么,返回数通过
os.cpu_count()
被使用。
若
initializer
不是
None
那么,各工作者进程将调用
initializer(*initargs)
当它开始时。
不像
Pool
,
maxtasksperchild
and
context
cannot be provided.
注意
A
ThreadPool
shares the same interface as
Pool
, which is designed around a pool of processes and predates the introduction of the
concurrent.futures
module. As such, it inherits some operations that don’t make sense for a pool backed by threads, and it has its own type for representing the status of asynchronous jobs,
AsyncResult
, that is not understood by any other libraries.
Users should generally prefer to use
concurrent.futures.ThreadPoolExecutor
, which has a simpler interface that was designed around threads from the start, and which returns
concurrent.futures.Future
instances that are compatible with many other libraries, including
asyncio
.
应遵循某些指导方针和习语,当使用
multiprocessing
.
以下可应用于所有启动方法。
避免共享状态
As far as possible one should try to avoid shifting large amounts of data between processes.
It is probably best to stick to using queues or pipes for communication between processes rather than using the lower level synchronization primitives.
Picklability
Ensure that the arguments to the methods of proxies are picklable.
代理的线程安全
Do not use a proxy object from more than one thread unless you protect it with a lock.
(There is never a problem with different processes using the same proxy.)
Joining zombie processes
On POSIX when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (or
active_children()
is called) all completed processes which have not yet been joined will be joined. Also calling a finished process’s
Process.is_alive
will join the process. Even so it is probably good practice to explicitly join all the processes that you start.
Better to inherit than pickle/unpickle
当使用
spawn
or
forkserver
start methods many types from
multiprocessing
need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.
避免终止进程
使用
Process.terminate
method to stop a process is liable to cause any shared resources (such as locks, semaphores, pipes and queues) currently being used by the process to become broken or unavailable to other processes.
Therefore it is probably best to only consider using
Process.terminate
on processes which never use any shared resources.
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the
Queue.cancel_join_thread
method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
An example which will deadlock is the following:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()
A fix here would be to swap the last two lines (or simply remove the
p.join()
line).
把资源明确传递给子级进程
On POSIX using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.
Apart from making the code (potentially) compatible with Windows and the other start methods this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.
So for instance
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()
should be rewritten as
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Beware of replacing
sys.stdin
with a “file like object”
multiprocessing
originally unconditionally called:
os.close(sys.stdin.fileno())
在
multiprocessing.Process._bootstrap()
method — this resulted in issues with processes-in-processes. This has been changed to:
sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)
Which solves the fundamental issue of processes colliding with each other resulting in a bad file descriptor error, but introduces a potential danger to applications which replace
sys.stdin()
with a “file-like object” with output buffering. This danger is that if multiple processes call
close()
on this file-like object, it could result in the same data being flushed to the object multiple times, resulting in corruption.
If you write a file-like object and implement your own caching, you can make it fork-safe by storing the pid whenever you append to the cache, and discarding the cache when the pid changes. For example:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
There are a few extra restrictions which don’t apply to the fork start method.
More picklability
Ensure that all arguments to
Process.__init__()
are picklable. Also, if you subclass
Process
then make sure that instances will be picklable when the
Process.start
方法被调用。
全局变量
Bear in mind that if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that
Process.start
was called.
However, global variables which are just module level constants cause no problems.
Safe importing of main module
Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such as starting a new process).
For example, using the
spawn
or
forkserver
start method running the following module would fail with a
RuntimeError
:
from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()
Instead one should protect the “entry point” of the program by using
if
__name__ == '__main__':
如下:
from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()
(
freeze_support()
line can be omitted if the program will be run normally instead of frozen.)
This allows the newly spawned Python interpreter to safely import the module and then run the module’s
foo()
函数。
Similar restrictions apply if a pool or manager is created in the main module.
Demonstration of how to create and use customized managers and proxies:
from multiprocessing import freeze_support from multiprocessing.managers import BaseManager, BaseProxy import operator ## class Foo: def f(self): print('you called Foo.f()') def g(self): print('you called Foo.g()') def _h(self): print('you called Foo._h()') # A simple generator function def baz(): for i in range(10): yield i*i # Proxy type for generator objects class GeneratorProxy(BaseProxy): _exposed_ = ['__next__'] def __iter__(self): return self def __next__(self): return self._callmethod('__next__') # Function to return the operator module def get_operator_module(): return operator ## class MyManager(BaseManager): pass # register the Foo class; make `f()` and `g()` accessible via proxy MyManager.register('Foo1', Foo) # register the Foo class; make `g()` and `_h()` accessible via proxy MyManager.register('Foo2', Foo, exposed=('g', '_h')) # register the generator function baz; use `GeneratorProxy` to make proxies MyManager.register('baz', baz, proxytype=GeneratorProxy) # register get_operator_module(); make public functions accessible via proxy MyManager.register('operator', get_operator_module) ## def test(): manager = MyManager() manager.start() print('-' * 20) f1 = manager.Foo1() f1.f() f1.g() assert not hasattr(f1, '_h') assert sorted(f1._exposed_) == sorted(['f', 'g']) print('-' * 20) f2 = manager.Foo2() f2.g() f2._h() assert not hasattr(f2, 'f') assert sorted(f2._exposed_) == sorted(['g', '_h']) print('-' * 20) it = manager.baz() for i in it: print('<%d>' % i, end=' ') print() print('-' * 20) op = manager.operator() print('op.add(23, 45) =', op.add(23, 45)) print('op.pow(2, 94) =', op.pow(2, 94)) print('op._exposed_ =', op._exposed_) ## if __name__ == '__main__': freeze_support() test()
使用
Pool
:
import multiprocessing import time import random import sys # # Functions used by test code # def calculate(func, args): result = func(*args) return '%s says that %s%s = %s' % ( multiprocessing.current_process().name, func.__name__, args, result ) def calculatestar(args): return calculate(*args) def mul(a, b): time.sleep(0.5 * random.random()) return a * b def plus(a, b): time.sleep(0.5 * random.random()) return a + b def f(x): return 1.0 / (x - 5.0) def pow3(x): return x ** 3 def noop(x): pass # # Test code # def test(): PROCESSES = 4 print('Creating pool with %d processes\n' % PROCESSES) with multiprocessing.Pool(PROCESSES) as pool: # # Tests # TASKS = [(mul, (i, 7)) for i in range(10)] + \ [(plus, (i, 8)) for i in range(10)] results = [pool.apply_async(calculate, t) for t in TASKS] imap_it = pool.imap(calculatestar, TASKS) imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) print('Ordered results using pool.apply_async():') for r in results: print('\t', r.get()) print() print('Ordered results using pool.imap():') for x in imap_it: print('\t', x) print() print('Unordered results using pool.imap_unordered():') for x in imap_unordered_it: print('\t', x) print() print('Ordered results using pool.map() --- will block till complete:') for x in pool.map(calculatestar, TASKS): print('\t', x) print() # # Test error handling # print('Testing error handling:') try: print(pool.apply(f, (5,))) except ZeroDivisionError: print('\tGot ZeroDivisionError as expected from pool.apply()') else: raise AssertionError('expected ZeroDivisionError') try: print(pool.map(f, list(range(10)))) except ZeroDivisionError: print('\tGot ZeroDivisionError as expected from pool.map()') else: raise AssertionError('expected ZeroDivisionError') try: print(list(pool.imap(f, list(range(10))))) except ZeroDivisionError: print('\tGot ZeroDivisionError as expected from list(pool.imap())') else: raise AssertionError('expected ZeroDivisionError') it = pool.imap(f, list(range(10))) for i in range(10): try: x = next(it) except ZeroDivisionError: if i == 5: pass except StopIteration: break else: if i == 5: raise AssertionError('expected ZeroDivisionError') assert i == 9 print('\tGot ZeroDivisionError as expected from IMapIterator.next()') print() # # Testing timeouts # print('Testing ApplyResult.get() with timeout:', end=' ') res = pool.apply_async(calculate, TASKS[0]) while 1: sys.stdout.flush() try: sys.stdout.write('\n\t%s' % res.get(0.02)) break except multiprocessing.TimeoutError: sys.stdout.write('.') print() print() print('Testing IMapIterator.next() with timeout:', end=' ') it = pool.imap(calculatestar, TASKS) while 1: sys.stdout.flush() try: sys.stdout.write('\n\t%s' % it.next(0.02)) except StopIteration: break except multiprocessing.TimeoutError: sys.stdout.write('.') print() print() if __name__ == '__main__': multiprocessing.freeze_support() test()
An example showing how to use queues to feed tasks to a collection of worker processes and collect the results:
import time import random from multiprocessing import Process, Queue, current_process, freeze_support # # Function run by worker processes # def worker(input, output): for func, args in iter(input.get, 'STOP'): result = calculate(func, args) output.put(result) # # Function used to calculate result # def calculate(func, args): result = func(*args) return '%s says that %s%s = %s' % \ (current_process().name, func.__name__, args, result) # # Functions referenced by tasks # def mul(a, b): time.sleep(0.5*random.random()) return a * b def plus(a, b): time.sleep(0.5*random.random()) return a + b # # # def test(): NUMBER_OF_PROCESSES = 4 TASKS1 = [(mul, (i, 7)) for i in range(20)] TASKS2 = [(plus, (i, 8)) for i in range(10)] # Create queues task_queue = Queue() done_queue = Queue() # Submit tasks for task in TASKS1: task_queue.put(task) # Start worker processes for i in range(NUMBER_OF_PROCESSES): Process(target=worker, args=(task_queue, done_queue)).start() # Get and print results print('Unordered results:') for i in range(len(TASKS1)): print('\t', done_queue.get()) # Add more tasks using `put()` for task in TASKS2: task_queue.put(task) # Get and print some more results for i in range(len(TASKS2)): print('\t', done_queue.get()) # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put('STOP') if __name__ == '__main__': freeze_support() test()