multiprocessing.shared_memory — 用于跨进程直接访问的共享内存

源代码: Lib/multiprocessing/shared_memory.py

3.8 版新增。


此模块提供类 SharedMemory ,用于分配和管理由多核或 SMP (对称多处理器) 机器中的一个或多个进程所访问的共享内存。为辅助共享内存的生命周期管理,尤其是跨不同进程, BaseManager 子类, SharedMemoryManager ,也有提供在 multiprocessing.managers 模块。

在此模块中,共享内存指的是 "系统 V 样式" 的共享内存块 (尽管不必明确这样实现),而不是指 "分布式共享内存"。这种样式的共享内存准许截然不同的进程对易失性内存公用 (或共享) 区域进行潜在读/写。按照惯例限制进程只能访问自己的进程内存空间,但共享内存准许在进程之间共享数据,避免在包含该数据的进程之间代替需要发送消息。凭借内存直接共享数据可以提供显著的性能优势,相比凭借磁盘、套接字、其它要求序列化/反序列化通信的共享数据及拷贝数据。

class multiprocessing.shared_memory. SharedMemory ( name = None , create = False , size = 0 )

创建新的共享内存块 (或附加到现有共享内存块)。每个共享内存块赋值唯一名称。按此方式,一个进程可以采用特定名称创建共享内存块,且不同进程使用相同名称可以附加到同一共享内存块。

作为跨进程共享数据的资源,共享内存块可能比创建它们的原始进程长寿。当一个进程不再需要访问,其它进程可能仍然需要的共享内存块时, close() 方法应被调用。当任何进程不再需要共享内存块时, unlink() 方法应被调用以确保适当清理。

name 是用于请求共享内存的唯一名称,以字符串形式指定。当创建新的共享内存块时,若 None (默认) 被供给为名称,将生成 novel (新颖) 名称。

create 控制是创建新的共享内存块 ( True ) 还是附加到现有共享内存块 ( False ).

size 指定请求的字节数,当创建新的共享内存块时。因为某些平台会基于平台内存页面大小来分配内存块,共享内存块的准确大小可能 >= 请求大小。当附加到现有共享内存块时, size 参数被忽略。

close ( )

关闭对来自此实例共享内存的访问。为确保资源的适当清理,所有实例都应调用 close() 一旦实例不再需要。注意,调用 close() 不会导致共享内存块本身被销毁。

请求销毁底层共享内存块。为确保资源的适当清理, unlink() 应该被调用一次 (且仅一次) 跨需要共享内存块的所有进程。在请求销毁它之后,共享内存块可能 (或不可能) 被立即销毁,且这种行为跨平台可能不同。试图访问共享内存块内的数据后于 unlink() 被调用,可能导致内存访问错误。注意:最后交出其保持的共享内存块的进程可以调用 unlink() and close() 按任一次序。

buf

共享内存块内容的内存视图。

name

只读访问共享内存块的唯一名称。

size

只读访问共享内存块的字节大小。

以下范例演示低级用法为 SharedMemory 实例:

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory
						

以下范例演示实际使用 SharedMemory 类采用 NumPy 数组 ,访问同一 numpy.ndarray 从 2 个不同的 Python Shell:

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'
>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])
>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])
>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()
>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
						
class multiprocessing.managers. SharedMemoryManager ( [ address [ , authkey ] ] )

子类化的 BaseManager 可以用于跨进程管理共享内存块。

调用 start() SharedMemoryManager 实例将导致启动新进程。此新进程的唯一用途,是管理透过它创建的所有共享内存块的生命周期。要触发由该进程管理的所有共享内存块的释放,调用 shutdown() 在实例。这触发 SharedMemory.unlink() 调用当所有 SharedMemory 对象由该进程管理,然后停止进程本身。通过创建 SharedMemory 实例透过 SharedMemoryManager ,避免需要手动追踪和触发共享内存资源的释放。

此类提供的方法用于创建并返回 SharedMemory 实例和用于创建像列表对象 ( ShareableList ) 以共享内存作后盾。

参考 multiprocessing.managers.BaseManager 了解描述为继承的 address and authkey 可选输入自变量及如何使用它们以连接到现有 SharedMemoryManager 服务从其它进程。

SharedMemory ( size )

创建并返回新的 SharedMemory 对象采用指定 size 以字节为单位。

ShareableList ( sequence )

创建并返回新的 ShareableList 对象,初始值来自输入 sequence .

以下范例演示基本机制为 SharedMemoryManager :

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl
						

以下范例叙述潜在更方便的模式为使用 SharedMemoryManager 对象凭借 with 语句以确保释放所有共享内存块,当不再需要它们时:

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl
						

当使用 SharedMemoryManager with 语句,会释放使用该管理器创建的所有共享内存块,当 with 语句的代码块完成执行。

class multiprocessing.shared_memory. ShareableList ( sequence = None , * , name = None )

提供所有存储值存储在共享内存块中的像可变列表的对象。这将可存储值约束到仅 int , float , bool , str (每个小于 10MB), bytes (每个小于 10MB),和 None 内置数据类型。它也明显不同于内置 list 类型,不可以改变这些列表的整体长度 (即:不能追加、插入、等),且不支持动态创建新的 ShareableList 实例凭借切片。

sequence 是用于填充新 ShareableList 的完整值。设为 None 而不是附加到已存在 ShareableList 通过其唯一共享内存名称。

name 是请求共享内存的唯一名称,如定义中描述的对于 SharedMemory 。当附加到现有 ShareableList ,指定其共享内存块的唯一名称,而剩下的 sequence 设为 None .

count ( value )

返回出现数为 value .

index ( value )

返回第一索引位置为 value 。引发 ValueError if value 不存在。

format

只读属性包含 struct 打包格式,用于所有的目前存储值。

shm

SharedMemory 实例,在其中存储值。

以下范例演示基本用法为 ShareableList 实例:

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported
						

以下范例叙述 1 个、2 个或多个进程如何访问同一 ShareableList 通过提供共享内存块名称,稍后:

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()
						

以下范例演示 ShareableList (和底层 SharedMemory ) 对象可以被腌制和取消腌制,若需要。注意,它仍然是同一共享对象。这会发生,是因为反序列化对象拥有相同唯一名称,且恰好采用同一名称附加到现有对象 (若对象仍存活):

>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
						
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
						
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
						
>>> sl.shm.close()
>>> sl.shm.unlink()
						

上一话题

multiprocessing — 基于进程的并行

下一话题

concurrent

本页