multiprocessing.shared_memory --- 對於共享記憶體的跨行程直接存取

原始碼:Lib/multiprocessing/shared_memory.py

在 3.8 版新加入.


該模組提供了一個 SharedMemory 類別,用於分配和管理被多核心或對稱多處理器 (symmetric multiprocessor, SMP) 機器上的一個或多個行程存取的共享記憶體。為了協助共享記憶體的生命週期管理,特別是跨不同行程的管理,multiprocessing.managers 模組中還提供了一個 BaseManager 子類別 SharedMemoryManager

在此模組中,共享記憶體是指「System V 風格」的共享記憶體區塊(儘管不一定如此明確實作),而不是指「分散式共享記憶體 (distributed shared memory)」。這種型別的共享記憶體允許不同的行程潛在地讀取和寫入揮發性記憶體 (volatile memory) 的公開(或共享)區域。通常行程只能存取自己的行程記憶體空間,但共享記憶體允許在行程之間共享資料,從而避免需要跨行程傳遞資料的情境。與透過硬碟或 socket 或其他需要序列化/還原序列化 (serialization/deserialization) 和複製資料的通訊方式以共享資料相比,直接透過記憶體共享資料可以提供顯著的性能優勢。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)

建立新的共享記憶體區塊或附加到現有的共享記憶體區塊。每個共享記憶體區塊都被分配了一個唯一的名稱。透過這種方式,一個行程可以建立具有特定名稱的共享記憶體區塊,而不同的行程可以使用該相同名稱附加到同一共享記憶體區塊。

作為跨行程共享資料的資源,共享記憶體區塊的壽命可能比建立它們的原始行程還要長。當一個行程不再需要存取但其他行程可能仍需要的共享記憶體區塊時,應該呼叫 close() 方法。當任何行程不再需要共享記憶體區塊時,應呼叫 unlink() 方法以確保正確清理。

name 是所請求的共享記憶體的唯一名稱,指定為字串。建立新的共享記憶體區塊時,如果名稱提供為 None(預設值),則會生成一個新的名稱。

create 控制是否建立新的共享記憶體區塊 (True) 或附加現有的共享記憶體區塊 (False)。

size 指定了建立新共享記憶體區塊時請求的位元組數。由於某些平台會根據該平台的記憶體頁 (memory page) 大小來選擇分配記憶體區塊,因此共享記憶體區塊的確切大小可能大於或等於請求的大小。當附加到現有共享記憶體區塊時,size 參數將被忽略。

close()

關閉從此實例對共享記憶體的存取。為了確保正確清理資源,一旦實例不再被需要,所有實例都應該呼叫 close()。請注意,呼叫 close() 不會使得共享記憶體區塊本身被銷毀。

請求銷毀底層共享記憶體區塊。為了確保正確清理資源,應該在需要共享記憶體區塊的所有行程中呼叫一次(且僅一次)unlink()。請求銷毀後,共享記憶體區塊可能會也可能不會立即銷毀,此行為可能因平台而異。呼叫 unlink() 後嘗試存取共享記憶體區塊內的資料可能會導致記憶體存取錯誤。注意:最後一個放棄持有某共享記憶體區塊的行程可以按任意順序呼叫 unlink()close()

buf

共享記憶體區塊內容的記憶體視圖 (memoryview)。

name

對共享記憶體區塊之唯一名稱的唯讀存取。

size

對共享記憶體區塊大小(以位元組為單位)的唯讀存取。

以下範例示範了 SharedMemory 實例的低階使用方式:

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

以下範例示範了 SharedMemory 類別與 NumPy 陣列的實際用法:從兩個不同的 Python shell 存取相同的 numpy.ndarray

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])

BaseManager 的子類別,可用於跨行程管理共享記憶體區塊。

SharedMemoryManager 實例上呼叫 start() 會啟動一個新行程。這個新行程的唯一目的是管理那些透過它建立出的所有共享記憶體區塊的生命週期。要觸發釋放該行程管理的所有共享記憶體區塊,請在實例上呼叫 shutdown(),這會觸發對該行程管理的所有 SharedMemory 物件的 SharedMemory.unlink() 呼叫,然後再停止這個行程。透過 SharedMemoryManager 建立 SharedMemory 實例,我們無需手動追蹤和觸發共享記憶體資源的釋放。

此類別提供了用於建立和回傳 SharedMemory 實例以及建立由共享記憶體支援的類串列物件 (ShareableList) 的方法。

請參閱 multiprocessing.managers.BaseManager 了解繼承的 addressauthkey 可選輸入引數的描述以及如何使用它們從其他行程連接到現有的 SharedMemoryManager 服務。

SharedMemory(size)

建立並回傳一個新的 SharedMemory 物件,該物件具有指定的 size(以位元組為單位)。

ShareableList(sequence)

建立並回傳一個新的 ShareableList 物件,該物件由輸入 sequence 中的值初始化。

以下範例示範了 SharedMemoryManager 的基本作用機制:

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

以下範例描述了一種可能更方便的模式,即透過 with 陳述式使用 SharedMemoryManager 物件,以確保所有共享記憶體區塊不再被需要後都被釋放:

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

with 陳述式中使用 SharedMemoryManager 時,當 with 陳述式的程式碼區塊執行完畢時,使用該管理器建立的共享記憶體區塊都會被釋放。

class multiprocessing.shared_memory.ShareableList(sequence=None, \*, name=None)

提供一個類似串列的可變物件,其中有被儲存的所有值都儲存在共享記憶體區塊中。這限制了可存儲的值只能夠是內建資料型別 int(帶符號的 64 位元 (signed 64-bit))、floatboolstr(編碼為 utf-8 時個別小於 10M 位元組)、bytes(個別小於 10M 位元組)和 None。它還與內建的 list 型別有顯著不同,因為這些串列不能更改其總長度(即不能追加 (append)、插入等),並且不支援透過切片動態建立新的 ShareableList 實例。

sequence 用於填充 (populate) 一個充滿值的新 ShareableList。設定為 None 以透過其唯一的共享記憶體名稱來附加到已經存在的 ShareableList

SharedMemory 的定義中所述,name 是被請求之共享記憶體的唯一名稱。當附加到現有的 ShareableList 時,指定其共享記憶體區塊的唯一名稱,同時將 sequence 設定為 None

備註

bytesstr 值存在一個已知問題。如果它們以 \x00 nul 位元組或字元結尾,那麼當透過索引從 ShareableList 中獲取它們時,這些位元組或字元可能會被默默地剝離 (silently stripped)。這種 .rstrip(b'\x00') 行為被認為是一個錯誤,將來可能會消失。請參閱 gh-106939

對於去除尾隨空值 (rstripping of trailing nulls) 會出問題的應用程式,變通解法 (workaround) 是始終無條件地在儲存時於此類值的末尾追加一個額外非 0 位元組,並在獲取時也無條件地刪除它:

>>> from multiprocessing import shared_memory
>>> nul_bug_demo = shared_memory.ShareableList(['?\x00', b'\x03\x02\x01\x00\x00\x00'])
>>> nul_bug_demo[0]
'?'
>>> nul_bug_demo[1]
b'\x03\x02\x01'
>>> nul_bug_demo.shm.unlink()
>>> padded = shared_memory.ShareableList(['?\x00\x07', b'\x03\x02\x01\x00\x00\x00\x07'])
>>> padded[0][:-1]
'?\x00'
>>> padded[1][:-1]
b'\x03\x02\x01\x00\x00\x00'
>>> padded.shm.unlink()
count(value)

回傳 value 出現的次數。

index(value)

回傳 value 的第一個索引位置。如果 value 不存在,則引發 ValueError

format

唯讀屬性,包含所有目前有儲存的值所使用的 struct 打包格式。

shm

儲存值的 SharedMemory 實例。

以下範例示範了 ShareableList 實例的基本用法:

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

以下範例描述了一個、兩個或多個行程如何透過提供後面的共享記憶體區塊名稱來存取同一個 ShareableList

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

以下範例示範了如果需要,可以對 ShareableList(和底層 SharedMemory)物件進行 pickle 和 unpickle。請注意,它仍然是相同的共享物件。發生這種情況是因為反序列化的物件具有相同的唯一名稱,並且只是附加到具有相同名稱的現有物件(如果該物件仍然存在):

>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()