multiprocessing.shared_memory
— 用于跨进程直接访问的共享内存
¶
源代码: Lib/multiprocessing/shared_memory.py
3.8 版新增。
此模块提供类
SharedMemory
,用于分配和管理由多核或 SMP (对称多处理器) 机器中的一个或多个进程所访问的共享内存。为辅助共享内存的生命周期管理,尤其是跨不同进程,
BaseManager
子类,
SharedMemoryManager
,也有提供在
multiprocessing.managers
模块。
在此模块中,共享内存指的是 "系统 V 样式" 的共享内存块 (尽管不必明确这样实现),而不是指 "分布式共享内存"。这种样式的共享内存准许截然不同的进程对易失性内存公用 (或共享) 区域进行潜在读/写。按照惯例限制进程只能访问自己的进程内存空间,但共享内存准许在进程之间共享数据,避免在包含该数据的进程之间代替需要发送消息。凭借内存直接共享数据可以提供显著的性能优势,相比凭借磁盘、套接字、其它要求序列化/反序列化通信的共享数据及拷贝数据。
创建新的共享内存块 (或附加到现有共享内存块)。每个共享内存块赋值唯一名称。按此方式,一个进程可以采用特定名称创建共享内存块,且不同进程使用相同名称可以附加到同一共享内存块。
作为跨进程共享数据的资源,共享内存块可能比创建它们的原始进程长寿。当一个进程不再需要访问,其它进程可能仍然需要的共享内存块时,
close()
方法应被调用。当任何进程不再需要共享内存块时,
unlink()
方法应被调用以确保适当清理。
name
是用于请求共享内存的唯一名称,以字符串形式指定。当创建新的共享内存块时,若
None
(默认) 被供给为名称,将生成 novel (新颖) 名称。
create
控制是创建新的共享内存块 (
True
) 还是附加到现有共享内存块 (
False
).
size
指定请求的字节数,当创建新的共享内存块时。因为某些平台会基于平台内存页面大小来分配内存块,共享内存块的准确大小可能 >= 请求大小。当附加到现有共享内存块时,
size
参数被忽略。
关闭对来自此实例共享内存的访问。为确保资源的适当清理,所有实例都应调用
close()
一旦实例不再需要。注意,调用
close()
不会导致共享内存块本身被销毁。
请求销毁底层共享内存块。为确保资源的适当清理,
unlink()
应该被调用一次 (且仅一次) 跨需要共享内存块的所有进程。在请求销毁它之后,共享内存块可能 (或不可能) 被立即销毁,且这种行为跨平台可能不同。试图访问共享内存块内的数据后于
unlink()
被调用,可能导致内存访问错误。注意:最后交出其保持的共享内存块的进程可以调用
unlink()
and
close()
按任一次序。
共享内存块内容的内存视图。
只读访问共享内存块的唯一名称。
只读访问共享内存块的字节大小。
以下范例演示低级用法为
SharedMemory
实例:
>>> from multiprocessing import shared_memory >>> shm_a = shared_memory.SharedMemory(create=True, size=10) >>> type(shm_a.buf) <class 'memoryview'> >>> buffer = shm_a.buf >>> len(buffer) 10 >>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once >>> buffer[4] = 100 # Modify single byte at a time >>> # Attach to an existing shared memory block >>> shm_b = shared_memory.SharedMemory(shm_a.name) >>> import array >>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array array('b', [22, 33, 44, 55, 100]) >>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes >>> bytes(shm_a.buf[:5]) # Access via shm_a b'howdy' >>> shm_b.close() # Close each SharedMemory instance >>> shm_a.close() >>> shm_a.unlink() # Call unlink only once to release the shared memory
以下范例演示实际使用
SharedMemory
类采用
NumPy 数组
,访问同一
numpy.ndarray
从 2 个不同的 Python Shell:
>>> # In the first Python interactive shell >>> import numpy as np >>> a = np.array([1, 1, 2, 3, 5, 8]) # Start with an existing NumPy array >>> from multiprocessing import shared_memory >>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes) >>> # Now create a NumPy array backed by shared memory >>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) >>> b[:] = a[:] # Copy the original data into shared memory >>> b array([1, 1, 2, 3, 5, 8]) >>> type(b) <class 'numpy.ndarray'> >>> type(a) <class 'numpy.ndarray'> >>> shm.name # We did not specify a name so one was chosen for us 'psm_21467_46075' >>> # In either the same shell or a new Python shell on the same machine >>> import numpy as np >>> from multiprocessing import shared_memory >>> # Attach to the existing shared memory block >>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075') >>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example >>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf) >>> c array([1, 1, 2, 3, 5, 8]) >>> c[-1] = 888 >>> c array([ 1, 1, 2, 3, 5, 888]) >>> # Back in the first Python interactive shell, b reflects this change >>> b array([ 1, 1, 2, 3, 5, 888]) >>> # Clean up from within the second Python shell >>> del c # Unnecessary; merely emphasizing the array is no longer used >>> existing_shm.close() >>> # Clean up from within the first Python shell >>> del b # Unnecessary; merely emphasizing the array is no longer used >>> shm.close() >>> shm.unlink() # Free and release the shared memory block at the very end
子类化的
BaseManager
可以用于跨进程管理共享内存块。
调用
start()
在
SharedMemoryManager
实例将导致启动新进程。此新进程的唯一用途,是管理透过它创建的所有共享内存块的生命周期。要触发由该进程管理的所有共享内存块的释放,调用
shutdown()
在实例。这触发
SharedMemory.unlink()
调用当所有
SharedMemory
对象由该进程管理,然后停止进程本身。通过创建
SharedMemory
实例透过
SharedMemoryManager
,避免需要手动追踪和触发共享内存资源的释放。
此类提供的方法用于创建并返回
SharedMemory
实例和用于创建像列表对象 (
ShareableList
) 以共享内存作后盾。
参考
multiprocessing.managers.BaseManager
了解描述为继承的
address
and
authkey
可选输入自变量及如何使用它们以连接到现有
SharedMemoryManager
服务从其它进程。
创建并返回新的
SharedMemory
对象采用指定
size
以字节为单位。
创建并返回新的
ShareableList
对象,初始值来自输入
sequence
.
以下范例演示基本机制为
SharedMemoryManager
:
>>> from multiprocessing.managers import SharedMemoryManager >>> smm = SharedMemoryManager() >>> smm.start() # Start the process that manages the shared memory blocks >>> sl = smm.ShareableList(range(4)) >>> sl ShareableList([0, 1, 2, 3], name='psm_6572_7512') >>> raw_shm = smm.SharedMemory(size=128) >>> another_sl = smm.ShareableList('alpha') >>> another_sl ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221') >>> smm.shutdown() # Calls unlink() on sl, raw_shm, and another_sl
以下范例叙述潜在更方便的模式为使用
SharedMemoryManager
对象凭借
with
语句以确保释放所有共享内存块,当不再需要它们时:
>>> with SharedMemoryManager() as smm: ... sl = smm.ShareableList(range(2000)) ... # Divide the work among two processes, storing partial results in sl ... p1 = Process(target=do_work, args=(sl, 0, 1000)) ... p2 = Process(target=do_work, args=(sl, 1000, 2000)) ... p1.start() ... p2.start() # A multiprocessing.Pool might be more efficient ... p1.join() ... p2.join() # Wait for all work to complete in both processes ... total_result = sum(sl) # Consolidate the partial results now in sl
当使用
SharedMemoryManager
在
with
语句,会释放使用该管理器创建的所有共享内存块,当
with
语句的代码块完成执行。
提供所有存储值存储在共享内存块中的像可变列表的对象。这将可存储值约束到仅
int
,
float
,
bool
,
str
(每个小于 10MB),
bytes
(每个小于 10MB),和
None
内置数据类型。它也明显不同于内置
list
类型,不可以改变这些列表的整体长度 (即:不能追加、插入、等),且不支持动态创建新的
ShareableList
实例凭借切片。
sequence
是用于填充新
ShareableList
的完整值。设为
None
而不是附加到已存在
ShareableList
通过其唯一共享内存名称。
name
是请求共享内存的唯一名称,如定义中描述的对于
SharedMemory
。当附加到现有
ShareableList
,指定其共享内存块的唯一名称,而剩下的
sequence
设为
None
.
返回出现数为
value
.
返回第一索引位置为
value
。引发
ValueError
if
value
不存在。
只读属性包含
struct
打包格式,用于所有的目前存储值。
SharedMemory
实例,在其中存储值。
以下范例演示基本用法为
ShareableList
实例:
>>> from multiprocessing import shared_memory >>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42]) >>> [ type(entry) for entry in a ] [<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>] >>> a[2] -273.154 >>> a[2] = -78.5 >>> a[2] -78.5 >>> a[2] = 'dry ice' # Changing data types is supported as well >>> a[2] 'dry ice' >>> a[2] = 'larger than previously allocated storage space' Traceback (most recent call last): ... ValueError: exceeds available storage for existing str >>> a[2] 'dry ice' >>> len(a) 7 >>> a.index(42) 6 >>> a.count(b'howdy') 0 >>> a.count(b'HoWdY') 1 >>> a.shm.close() >>> a.shm.unlink() >>> del a # Use of a ShareableList after call to unlink() is unsupported
以下范例叙述 1 个、2 个或多个进程如何访问同一
ShareableList
通过提供共享内存块名称,稍后:
>>> b = shared_memory.ShareableList(range(5)) # In a first process >>> c = shared_memory.ShareableList(name=b.shm.name) # In a second process >>> c ShareableList([0, 1, 2, 3, 4], name='...') >>> c[-1] = -999 >>> b[-1] -999 >>> b.shm.close() >>> c.shm.close() >>> c.shm.unlink()
以下范例演示
ShareableList
(和底层
SharedMemory
) 对象可以被腌制和取消腌制,若需要。注意,它仍然是同一共享对象。这会发生,是因为反序列化对象拥有相同唯一名称,且恰好采用同一名称附加到现有对象 (若对象仍存活):
>>> import pickle >>> from multiprocessing import shared_memory >>> sl = shared_memory.ShareableList(range(10)) >>> list(sl) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl)) >>> list(deserialized_sl) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1 >>> deserialized_sl[1] = -2 >>> list(sl) [-1, -2, 2, 3, 4, 5, 6, 7, 8, 9] >>> list(deserialized_sl) [-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close() >>> sl.shm.unlink()