mpsf package

Submodules

Module contents

class mpsf.DataParser(data: Iterable, data_length: int)

Bases: object

Class to create an object which count the number of rows that has been retrieved. Note that to use this, the total number of data points data_length must be known.

class mpsf.StopNow

Bases: object

Class used to detect when a queue is empty.

mpsf.populate_queue(queue: multiprocessing.context.BaseContext.Queue, data_container: Iterable, n_workers: int, fill_pause=10, max_queue_size=100)

Populates the queue holding the data.

Parameters
  • queue – Queue to place data into.

  • data_container – Iterable that contains data.

  • n_workers – The number of workers initialized.

  • fill_pause – The pause between each check of queue size. Made ot not overuse cpu power on a loop with only a

qsize check. This might not be relevant - I have not tested it. :param max_queue_size: The maximum size of the queue - this limits memory used throughout the process.

mpsf.smart_multi_proc(work_function: Callable, data: Iterable, n_processes=4, request_delay=5, fill_pause=10, max_queue_size=100)

Runs a function where each row in data corresponds to one run of the function - meaning the length of data is equal to the total number of runs. Data is fed to the function through a pipe, which can hold max_queue_size* objets at a time. The delay before a new input for the function is requested is set with request_delta and the time between the queue is being refilled is set with fill_pause.

Parameters
  • work_function – Function to run.

  • data – Iterator that with __next__() gives the next row of data.

  • n_processes – The number of concurrent processes.

  • request_delay – The delay between a function finished with one job and starts a new one.

  • fill_pause – The delay between each request to refill the pipe with data rows.

  • max_queue_size – The maximum queue size.

mpsf.worker(queue: multiprocessing.context.BaseContext.Queue, computation_function: Callable, identifier: int, request_delay=5)

Function responsible for retrieving a data point and passing it on to the computation function.

Parameters
  • queue – Queue with data in it.

  • computation_function – Function that does computation with the data point as input.

  • identifier – Identifier for the runner, each runner should have a different identifier.

  • request_delay – If all runners can compute faster than the queue can be populated, this delay will be imposed

each time the queue is detected to be empty.