The processing package mostly replicates the API of the threading module.
- class Process(group=None, target=None, name=None, args=(), kwargs={})
An analogue of threading.Thread.
See Process objects.
- exception ProcessExit
- Exception raised in a target process when the Process.stop() method is used. This is a subclass of SystemExit.
- exception BufferTooShort
Exception raised by the recvbytes_into() method of a connection object when the supplied buffer object is too small for the message read.
If e is an instance of BufferTooShort then e.args[0] will give the message as a byte string.
When using multiple processes one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks.
For passing messages one can use a pipe (for a connection between two processes) or a queue (which allows multiple producers and consumers).
Note that one can also create a shared queue by using a manager object -- see Managers.
For an example of the usage of queues for interprocess communication see test_workers.py.
- Pipe()
Returns a pair of connection objects representing the ends of a duplex pipe.
These connection objects can be inherited by child processes and have methods send() and recv() (among others) for sending and receiving picklable objects. (See Connection objects.) For example:
>>> from processing import Pipe >>> a, b = Pipe() >>> a.send([1, 'hello', None]) >>> b.recv() [1, 'hello', None] >>> b.sendbytes('thank you') >>> a.recvbytes() 'thank you'Note that it is not safe to have more than one process (or thread) reading or writing to the same end of a pipe at the same time.
Also note that if a process is killed while it is trying to read or write to a pipe then the data in the pipe is likely to become corrupted (because it may become impossible to be sure where the message boundaries lie).
On Windows this requires the _processing extension.
- Queue(maxsize=0)
Returns a process shared queue implemented using a pipe and a few locks/semaphores. A background thread transfers objects from a buffer into the pipe.
It is a near clone of Queue.Queue except that the qsize() method is not implemented and that the task_done() and join() methods introduced in Python 2.5 are also missing.
Queue has a few additional methods:
- putmany(iterable)
- If the queue has infinite size then this adds all items in the iterable to the queue's buffer. So q.putmany(X) is a faster alternative to for x in X: q.put(x). Raises an error if the queue has finite size.
- close()
- Indicates that no more data will be put on this queue by the current process. The background thread will quit once it has flushed all it data. This is called automatically when the queue is garbage collected.
- jointhread()
This joins the background thread and can only be used after close() has been called. This blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.
By default if a process is not the creator of the queue then on exit it will attempt to join the queue's background thread. The process can call canceljoin() to prevent this behaviour.
- canceljoin()
- Prevents the background thread from being joined automatically when the process exits. Unnecessary if the current process created the queue.
Note that if a process is killed while it is trying to receive or send to a queue then the data in the queue is likely to become corrupted (because it may become impossible to be sure where the message boundaries lie).
- SimpleQueue()
A simplified and faster alternative to Queue(). It is really just a pipe protected by a couple of locks.
It has get() and put() methods but these do not have block or timeout arguments. It also has an empty() method.
Warning: Unlike with Queue (when maxsize is zero) using put() may block if the pipe's buffer does not have sufficient space, so one must take care that no deadlocks are possible.
- PosixQueue(maxsize=0, msgsize=0)
A faster alternative to Queue() which is available on Unix systems which support Posix message queues.
However, posix queues have a maximum number of messages that can occupy the queue at a given time, and each message (when expressed as a pickled string) has a maximum length --- see man 7 mq_overview.
maxsize if specified determines the maximum number of items that be in the queue. Note that unlike Pythons's normal queue type if this is greater than a system defined maximum then an error is raised. If maxsize is zero then this maximum value is used.
msgsize if specified determines the maximum length that each message can be (when expressed as a pickled string). If this is greater than a system defined maximum then an error is raised. If msgsize is zero then this maximum value is used.
If one tries to send a message which is too long then ValueError will be raised.
Generally synchronization primitives are not a necessary in a multiprocess program as they are in a mulithreaded program.
Note that one can also create synchronization primitves by using a manager object -- see Managers.
The following all require support for native sempahores from the _processing extension.
- BoundedSemaphore(value=1)
- Returns a bounded semaphore object: a clone of threading.BoundedSemaphore.
- Condition(lock=None)
Returns a condition variable: a clone of threading.Condition.
If lock is specified then it should be a Lock or RLock object from processing.
- Event()
- Returns an event object: a clone of threading.Event.
- Lock()
Returns a non-recursive lock object: a near clone of threading.Lock.
There are two differences from threading.Lock: trying to acquire a lock already owned by the current thread raises an exception instead of deadlocking; and trying to release a lock held by a different thread/process will raise and exception.
- RLock()
- Returns a recursive lock object: a clone of threading.RLock.
- Semaphore(value=1)
- Returns a bounded semaphore object: a clone of threading.Semaphore.
Managers provide a way to create data which can be shared between different processes.
- LocalManager()
Returns a manager object which uses shared memory instead of a server process. It has instance methods
SharedValue, SharedStruct, SharedArrayfor creating objects stored in shared memory map. Also has static methods
Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queuewhich are just aliases for other functions in the processing namespace. See LocalManager.
Requires support for native semaphores from _processing.
- Manager()
Returns a started SyncManager object which can be used for sharing objects between processes. The returned manager object corresponds to a spawned child process and has methods which will create shared objects and return corresponding proxies.
The methods for creating shared objects are
list(), dict(), Namespace(), SharedValue(), SharedStruct(), SharedArray(), Lock(), RLock(), Semaphore(), BoundedSemaphore(), Condition(), Event(), Queue().For example:
>>> from processing import Manager >>> manager = Manager() >>> l = manager.list(range(10)) >>> l.reverse() >>> print l [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] >>> print repr(l) <Proxy[list] object at 0x00E1B3B0>See SyncManager and Proxy objects.
One can create a pool of processes which will carry out tasks submitted to it.
- Pool(processes=None)
Returns a process pool object which controls a pool of worker processes to which jobs can be submitted.
It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
If processes is None then the number returned by cpuCount() is used. See Pool objects.
Example:
from processing import Pool def f(x): return x*x if __name__ == '__main__': pool = Pool(processes=2) result1 = pool.apply_async(f, (10,)) result2 = pool.map_async(f, range(5)) print result1.get() # => "100" print result2.get(timeout=1) # => "[0, 1, 4, 9, 16]"Requires support for native semaphores from _processing.
Some support for logging is available. Note, however, that the logging package does not use process shared locks so it is possible (depending on the handler type) for messages from different processes to get mixed up.
- enableLogging(level, HandlerType=None, handlerArgs=(), format=None)
Enables logging and sets the debug level used by the package's logger to level -- see documentation for the logging package in the standard library.
If HandlerType is specified then a handler is created using HandlerType(*handlerArgs) and this will be used by the logger -- any previous handlers will be discarded. If format is specified then this will be used for the handler; otherwise format defaults to '[%(levelname)s/%(processName)s] %(message)s'. (The logger used by processing allows use of the non-standard '%(processName)s' format.)
If HandlerType is not specified and the logger has no handlers then a default one is created which prints to sys.stderr.
Note: on Windows a child process does not directly inherit its parent's logger; instead it will automatically call enableLogging() with the same arguments which were used when its parent process last called enableLogging() (if it ever did).
- getLogger()
- Returns the logger used by processing. If enableLogging() has not yet been called then None is returned.
Below is an example session with logging turned on:
>>> import processing, logging >>> processing.enableLogging(level=logging.INFO) >>> processing.getLogger().warn('doomed') [WARNING/MainProcess] doomed >>> m = processing.Manager() [INFO/SyncManager-1] process starting up [INFO/SyncManager-1] manager bound to '\\\\.\\pipe\\pyc-1352-0-r97d0b' >>> del m [INFO/MainProcess] sending shutdown message to manager [INFO/SyncManager-1] manager received shutdown message [INFO/SyncManager-1] running all "atexit" finalizers [INFO/SyncManager-1] process exiting with `os.exit(0)`
- activeChildren()
Return list of all live children of the current process.
Calling this has the side affect of "joining" any processes which have already finished.
- cpuCount()
- Returns the number of CPUs in the system. May raise NotImplementedError.
- currentProcess()
An analogue of threading.currentThread
Returns the object corresponding to the current process.
- freezeSupport()
Adds support for when a program which uses the processing package has been frozen to produce a Windows executable. (Has been tested with py2exe, PyInstaller and cx_Freeze.)
One needs to call this function straight after the if __name__ == '__main__' line of the main module. For example
from processing import Process, freezeSupport def f(): print "hello world!" if __name__ == '__main__': freezeSupport() p = Process(target=f) p.start()If the freezeSupport() line is missed out then the frozen executable produced from this module would (on Windows) recursively create new processes.
If the module is being run normally by the python interpreter then freezeSupport() has no effect.
Note