Queue

class pyrallel.queue.ShmQueue(chunk_size: int = 1048576, maxsize: int = 2, serializer=None, integrity_check: bool = False, deadlock_check: bool = False, deadlock_immanent_check: bool = True, watermark_check: bool = False, use_semaphores: bool = True, verbose: bool = False)

ShmQueue depends on shared memory instead of pipe to efficiently exchange data among processes. Shared memory is “System V style” memory blocks which can be shared and accessed directly by processes. This implementation is based on multiprocessing.shared_memory.SharedMemory hence requires Python >= 3.8. Its interface is almost identical to multiprocessing.queue. But it allows one to specify the serializer, which by default is pickle.

This implementation maintains two lists: a free buffer list, and a ready message list. The list heads for both lists are stored in a single shared memory area.

The free buffer list is linked by the next_block_id field in each shared buffer’s metadata area.

Messages are built out of chunks. Each chunk occupies a single buffer. Each chunk contains a pointer (an integer identifier) to the next chunk’s buffer using the next_chunk_block_id field in the shared buffer’s metadata area. The list of ready messages links the first chunk of each ready message using the next_block_id field in the shared buffer’s metadata area.

Messages are serialized for transfer from the sender to the receiver. The serialized size of a message may not exceed the chunk size times the maximum queue size. If the deadlock_immanent_check is enabled (which is True by default), a ValueError will be raised on an attempt to put a message that is too large.

Parameters:
  • chunk_size (int, optional) – Size of each chunk. By default, it is ShmQueue.DEFAULT_CHUNK_SIZE (1*1024*1024). If it is 0, it will be set to ShmQueue.MAX_CHUNK_SIZE (512*1024*1024).
  • maxsize (int, optional) – Maximum queue size, e.g. the maximum number of chunks available to a queue. If it is 0 (default), it will be set to ShmQueue.DEFAULT_MAXSIZE (2).
  • serializer (obj, optional) – Serializer to serialize and deserialize data. If it is None (default), pickle will be used. The serializer should implement loads(bytes data) -> object and dumps(object obj) -> bytes.
  • integrity_check (bool, optional) – When True, perform certain integrity checks on messages. 1) After serializing a message, immediately deserialize it to check for validity. 2) Save the length of a message after serialization. 3) Compute a checksum of each chunk of the message. 4) Include the total message size and chunk checksum in the metadata for each chunk. 5) When pulling a chunk from the queue, verify the chunk checksum. 6) After reassembling a message out of chunks, verify the total message size.
  • deadlock_check (bool, optional) – When fetching a writable block, print a message if two or more loops are needed to get a free block. (default is False)
  • deadlock_immanent_check (bool, optional) – Raise a ValueError if a message submitted to put(…) is too large to process. (Default is True)
  • watermark_check (bool, optional) – When true, prit a mesage with the largest message size so far in chunks.
  • use_semaphores (bool, optional) – When true, use semaphores to control access to the free list and the message list. The system will sleep when accessing these shared resources, instead of entering a polling loop.

Note

  • close needs to be invoked once to release memory and avoid a memory leak.
  • qsize, empty and full are implemented but may block.
  • Each shared queue consumes one shared memory area for the shared list heads and one shared memory area for each shared buffer. The underlying code in multiprocessing.shared_memory.SharedMemory consumes one process file descriptor for each shared memory area. There is a limit on the number of file descriptors that a process may have open.
  • Thus, there is a tradeoff between the chunk_size and maxsize: smaller chunks use memory more effectively with some overhead cost, but may run into the limit on the number of open file descriptors to process large messages and avoid blocking. Larger chunks waste unused space, but are less likely to run into the open file descriptor limit or to block waiting for a free buffer.

Example:

def run(q):
    e = q.get()
    print(e)

if __name__ == '__main__':
    q = ShmQueue(chunk_size=1024 * 4, maxsize=10)
    p = Process(target=run, args=(q,))
    p.start()
    q.put(100)
    p.join()
    q.close()
DEFAULT_CHUNK_SIZE = 1048576

The default size for a buffer chunk.

Type:int
DEFAULT_MAXSIZE = 2

The default maximum size for a queue.

Type:int
FREE_LIST_HEAD = 0

The index of the free buffer list head in the SharedMemory segment for sharing message queue list heads between processes.

Type:int
LIST_HEAD_SIZE = 12

The length of a list head structure in bytes.

Type:int
LIST_HEAD_STRUCT = {'block_count': (8, 12, 'I'), 'first_block': (0, 4, 'I'), 'last_block': (4, 8, 'I')}

The list head structure parameters for struct.pack(…) and struct.unpack(…). The list header structure maintains a block count in addition to first_block and last_block pointers.

MAX_CHUNK_SIZE = 536870912

The maximum allowable size for a buffer chunk. 512MB should be a large enough value.

Type:int
META_BLOCK_SIZE = 44

The length of the buffer metadata structure in bytes.

Type:int
META_STRUCT = {'checksum': (28, 32, 'I'), 'chunk_id': (16, 20, 'I'), 'msg_id': (0, 12, '12s'), 'msg_size': (12, 16, 'I'), 'next_block_id': (40, 44, 'I'), 'next_chunk_block_id': (36, 40, 'I'), 'src_pid': (32, 36, 'I'), 'total_chunks': (20, 24, 'I'), 'total_msg_size': (24, 28, 'I')}

The per-buffer metadata structure parameters for struct.pack(…) and struct.unpack(…).

MSG_LIST_HEAD = 1

The index of the queued message list head in the SharedMemory segment for sharing message queue list heads between processes.

Type:int
RESERVED_BLOCK_ID = 4294967295

RESERVED_BLOCK_ID is stored in the list head pointer and next chunk block id fields to indicate that thee is no next block. This value is intended to simplify debugging by removing stale next-block values. It is not used to test for blok chain termination; counters are used for that purpose, instead.

Type:int
__getstate__()

This routine retrieves queue information when forking a new process.

__setstate__(state)

This routine saves queue information when forking a new process.

add_block(lh: int, block_id: int)

Add a block to a block list.

Parameters:lh (int) – The index of the list head in the list head shared memory area.
add_free_block(block_id: int)

Return a block to the free block list.

Parameters:block_id (int) – The identifier of the block being returned.
add_msg(block_id: int)

Add a message to the available message list

Parameters:block_id (int) – The block identifier of the first chunk of the message.
close()

Indicate no more new data will be added and release the shared memory areas.

consume_msg_id()

Consume a message identifier.

Note

Message identifiers are consumed when we are certain that we can process the message. They will not be consumed if we start to process a message but fail due to a conition such as insufficient free buffers.

empty() → bool

bool: True when no messages are ready.

full() → bool

bool: True when no free blocks are available.

generate_msg_id() → bytes

bytes: Generate the next message identifier, but do not consume it.

Note

Message IDs are assigned independenyly by each process using the queue. They need to be paired with the source process ID to be used to identify a message for debugging.

get(block: bool = True, timeout: Optional[float] = None) → Any

Get the next available message from the queue.

Parameters:
  • block (bool, optional) – If it is set to True (default), it will only return when an item is available.
  • timeout (int, optional) – A positive integer for the timeout duration in seconds, which is only effective when block is set to True.
Returns:

A message object retrieved from the queue.

Return type:

object

Raises:
  • queue.Empty – This exception will be raised if it times out or queue is empty when block is False.
  • ValueError – An internal error occured in accessing the message’s metadata.
  • UnpicklingError – This exception is raised when the serializer is pickle and an error occured in deserializing the message.

Note

  • Errors other then UnpicklingError might be raised if a serialized other then pickle is specified.
get_block_count(lh: int) → int

int: Get the count of blocks queued in a block list.

Parameters:lh (int) – The index of the list head in the list head shared memory area.
get_data(block: multiprocessing.shared_memory.SharedMemory, data_size: int) → bytes

bytes: Get a memoryview of the a shared memory data block.

Parameters:
  • block (SharedMemory) – The chared memory block.
  • data_size (int) – The number of bytes in the returned memoryview slice.
get_first_block(lh: int) → Optional[int]

Get the first block on a block list, updating the list head fields.

Parameters:lh (int) – The index of the list head in the list head shared memory area.
Returns:No block is available int: The block_id of the first available block.
Return type:None
get_first_free_block(block: bool, timeout: Optional[float]) → Optional[int]

Get the first free block.

When using semaphores, optionally block with an optional timeout. If you choose to block without a timeout, the method will not return until a free block is available.
Parameters:
  • block (bool) – When True, and when using semaphores, wait until an free block is available or a timeout occurs.
  • timeout (typing.Optional[float]) – When block is True and timeout is positive, block for at most timeout seconds attempting to acquire the free block.
Returns:

No block is available int: The block_id of the first available block.

Return type:

None

get_first_msg(block: bool, timeout: Optional[float]) → Optional[int]

Take the first available message, if any, from the available message list.

When using semaphores, optionally block with an optional timeout. If you choose to block without a timeout, the method will not return until a free block is available.
Parameters:
  • block (bool) – When True, and when using semaphores, wait until an message is available or a timeout occurs.
  • timeout (typing.Optional[float]) – When block is True and timeout is positive, block for at most timeout seconds attempting to acquire the message.
Returns:

No message is available int: The block_id of the first chunk of the first available message.

Return type:

None

get_free_block_count() → int

int: Get the number of free blocks.

get_list_head_field(lh: int, type_: str) → int

int: Get a field from a list head.

Parameters:
  • lh (int) – The index of the list head in the list head shared memory.
  • type (str) – The name of the list head field.
get_meta(block: multiprocessing.shared_memory.SharedMemory, type_: str) → Union[bytes, int]

typing.Union[bytes, int]: Get a field from a block’s metadata area in shared memory.

Parameters:
  • block (SharedMemory) – The shared memory for the data block.
  • type (str) – The name of the metadata field to extract.
get_msg_count() → int

int: Get the number of messages on the message list.

get_nowait() → Any

Equivalent to get(False).

init_list_head(lh: int)
Initialize a block list, clearing the block count and setting the first_block
and last_block fields to the reserved value that indicates that they are void pointers.
Parameters:lh (int) – The index of the list head in the list head shared memory area.
next_readable_msg(block: bool, timeout: Optional[float] = None) → Tuple[int, bytes, int, int, int]

Get the next available message, with blocking and timeouts.

This method returns a 5-tuple: the data block and certain metadata. The reason for this complexity is to retrieve the metadata under a single access lock.

Parameters:
  • block (bool) – When True, and when using semaphores, wait until an free block is available or a timeout occurs.
  • timeout (typing.Optional[float]) – When block is True and timeout is positive, block for at most timeout seconds attempting to acquire the free block.
Returns:

The process iodentifier of the process that originated the message. msg_id (bytes): The messag identifier. block_id (int): The identifier for the first chunk in the message. total_chunks (int): The total number of chunks in the message. next_chunk_block_id (int): The identifier for the next chunk in the message.

Return type:

src_pid (int)

Raises:
  • queue.Empty – no messages are available and either nonblocking mode or a timeout occured.
  • ValueError – An internal error occured in accessing the message’s metadata.
next_writable_block_id(block: bool, timeout: Optional[float], msg_id: bytes, src_pid: int) → int

int: Get the block ID of the first free block.

Get the block ID of the first free block, supporting blocking/nonblocking modes and timeouts when blocking, even when semaphores are not being used. Store int he block’s metadata area the message ID for the message we are building and the pid of the process acquiring the block.

Parameters:
  • block (bool) – When True, and when using semaphores, wait until an free block is available or a timeout occurs.
  • timeout (typing.Optional[float]) – When block is True and timeout is positive, block for at most timeout seconds attempting to acquire the free block.
  • msg_id (bytes) – The message ID assigned to the message being built.
  • src_pid – The process ID (pid) of the process that is acquiring the block.
Raises:

queue.Full – No block is available. Full is raised immediately in nonblocking mode, or after the timeout in blocking mode when a timeout is specified.

put(msg: Any, block: bool = True, timeout: Optional[float] = None)

Put an object into a shared memory queue.

Parameters:
  • msg (obj) – The object which is to be put into queue.
  • block (bool, optional) – If it is set to True (default), it will return after an item is put into queue.
  • timeout (int, optional) – A positive integer for the timeout duration in seconds, which is only effective when block is set to True.
Raises:
  • queue.Full – Raised if the call times out or the queue is full when block is False.
  • ValueError – An internal error occured in accessing the message’s metadata.
  • ValueError – A request was made to send a message that, when serialized, exceeds the capacity of the queue.
  • PicklingError – This exception is raised when the serializer is pickle and an error occured in serializing the message.
  • UnpicklingError – This exception is raised when the serializer is pickle and an error occured in deserializing the message for an integrity check.

Note

  • Errors other then PicklingError might be raised if a serialized other then pickle is specified.
put_nowait(msg: Any)

Equivalent to put(obj, False).

qid_counter = 0

Each message queue has a queue ID (qid) that identifies the queue for debugging messages. This mutable class counter is used to create new queue ID values for newly-created queue. Implicitly, this assumes that message queues will be created by a single initialization process, then distributed to worker process. If shared message queues will be created by multiple processes, then the queue ID should be altered to incorporate the process ID (pid) of the process that created the shared message queue, or an additional field should be created and presented with the shared message queue’s creator’s pid..

Type:int
qsize() → int

int: Return the number of ready messages.