blob: 6b878b30f1e8c1f81ba420eb9e30a5d2cf5b8e02 [file] [log] [blame]
Yuheng Longf20cffa2013-06-03 18:46:00 -07001"""Pipeline process that encapsulates the actual content.
2
3The actual stages include the Steering algorithm, the builder and the executor.
4"""
5
6__author__ = 'yuhenglong@google.com (Yuheng Long)'
7
8import multiprocessing
9
10
11class PipelineProcess(multiprocessing.Process):
12 """A process that encapsulates the actual content.
13
14 It continuously pull tasks from the queue until a poison pill is received.
15 Once a job is received, it will hand it to the actual stage for processing.
16 """
17
18 # Poison pill means shutdown
19 POISON_PILL = None
20
21 def __init__(self, method, task_queue, result_queue):
22 """Set up input/output queue and the actual method to be called.
23
24 Args:
25 method: The actual pipeline stage to be invoked.
26 task_queue: The input task queue for this pipeline stage.
27 result_queue: The output task queue for this pipeline stage.
28 """
29
30 multiprocessing.Process.__init__(self)
31 self._method = method
32 self._task_queue = task_queue
33 self._result_queue = result_queue
34
35 def run(self):
36 """Busy pulling the next task from the queue for execution.
37
38 Once a job is pulled, this stage invokes the actual stage method and submits
39 the result to the next pipeline stage.
40
41 The process will terminate on receiving the poison pill from previous stage.
42 """
43
44 while True:
45 next_task = self.task_queue.get()
46 if next_task is None:
47 # Poison pill means shutdown
48 self.result_queue.put(None)
49 break
50 self._method(next_task)
51 self.result_queue.put(next_task)