Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame^] | 1 | """Pipeline process that encapsulates the actual content. |
| 2 | |
| 3 | The actual stages include the Steering algorithm, the builder and the executor. |
| 4 | """ |
| 5 | |
| 6 | __author__ = 'yuhenglong@google.com (Yuheng Long)' |
| 7 | |
| 8 | import multiprocessing |
| 9 | |
| 10 | |
| 11 | class PipelineProcess(multiprocessing.Process): |
| 12 | """A process that encapsulates the actual content. |
| 13 | |
| 14 | It continuously pull tasks from the queue until a poison pill is received. |
| 15 | Once a job is received, it will hand it to the actual stage for processing. |
| 16 | """ |
| 17 | |
| 18 | # Poison pill means shutdown |
| 19 | POISON_PILL = None |
| 20 | |
| 21 | def __init__(self, method, task_queue, result_queue): |
| 22 | """Set up input/output queue and the actual method to be called. |
| 23 | |
| 24 | Args: |
| 25 | method: The actual pipeline stage to be invoked. |
| 26 | task_queue: The input task queue for this pipeline stage. |
| 27 | result_queue: The output task queue for this pipeline stage. |
| 28 | """ |
| 29 | |
| 30 | multiprocessing.Process.__init__(self) |
| 31 | self._method = method |
| 32 | self._task_queue = task_queue |
| 33 | self._result_queue = result_queue |
| 34 | |
| 35 | def run(self): |
| 36 | """Busy pulling the next task from the queue for execution. |
| 37 | |
| 38 | Once a job is pulled, this stage invokes the actual stage method and submits |
| 39 | the result to the next pipeline stage. |
| 40 | |
| 41 | The process will terminate on receiving the poison pill from previous stage. |
| 42 | """ |
| 43 | |
| 44 | while True: |
| 45 | next_task = self.task_queue.get() |
| 46 | if next_task is None: |
| 47 | # Poison pill means shutdown |
| 48 | self.result_queue.put(None) |
| 49 | break |
| 50 | self._method(next_task) |
| 51 | self.result_queue.put(next_task) |