Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 1 | """Pipeline process that encapsulates the actual content. |
| 2 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 3 | The actual stages include the builder and the executor. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 4 | """ |
| 5 | |
| 6 | __author__ = 'yuhenglong@google.com (Yuheng Long)' |
| 7 | |
| 8 | import multiprocessing |
| 9 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 10 | # Pick an integer at random. |
| 11 | POISONPILL = 975 |
| 12 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 13 | |
| 14 | class PipelineProcess(multiprocessing.Process): |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 15 | """A process that encapsulates the actual content pipeline stage. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 16 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 17 | The actual pipeline stage can be the builder or the tester. This process |
| 18 | continuously pull tasks from the queue until a poison pill is received. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 19 | Once a job is received, it will hand it to the actual stage for processing. |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 20 | |
| 21 | Each pipeline stage contains three modules. |
| 22 | The first module continuously pulls task from the input queue. It searches the |
| 23 | cache to check whether the task has encountered before. If so, duplicate |
| 24 | computation can be avoided. |
| 25 | The second module consists of a pool of workers that do the actual work, e.g., |
| 26 | the worker will compile the source code and get the image in the builder |
| 27 | pipeline stage. |
| 28 | The third module is a helper that put the result cost to the cost field of the |
| 29 | duplicate tasks. For example, if two tasks are equivalent, only one task, say |
| 30 | t1 will be executed and the other task, say t2 will not be executed. The third |
| 31 | mode gets the result from t1, when it is available and set the cost of t2 to |
| 32 | be the same as that of t1. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 33 | """ |
| 34 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 35 | def __init__(self, num_processes, name, cache, stage, task_queue, helper, |
| 36 | worker, result_queue): |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 37 | """Set up input/output queue and the actual method to be called. |
| 38 | |
| 39 | Args: |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 40 | num_processes: Number of helpers subprocessors this stage has. |
| 41 | name: The name of this stage. |
| 42 | cache: The computed tasks encountered before. |
| 43 | stage: An int value that specifies the stage for this pipeline stage, for |
| 44 | example, build stage or test stage. This value will be used to retrieve |
| 45 | the keys in different stage. I.e., the flags set is the key in build |
| 46 | stage and the checksum is the key in the test stage. The key is used to |
| 47 | detect duplicates. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 48 | task_queue: The input task queue for this pipeline stage. |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 49 | helper: The method hosted by the helper module to fill up the cost of the |
| 50 | duplicate tasks. |
| 51 | worker: The method hosted by the worker pools to do the actual work, e.g., |
| 52 | compile the image. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 53 | result_queue: The output task queue for this pipeline stage. |
| 54 | """ |
| 55 | |
| 56 | multiprocessing.Process.__init__(self) |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 57 | |
| 58 | self._name = name |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 59 | self._task_queue = task_queue |
| 60 | self._result_queue = result_queue |
| 61 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 62 | self._helper = helper |
| 63 | self._worker = worker |
| 64 | |
| 65 | self._cache = cache |
| 66 | self._stage = stage |
| 67 | self._num_processes = num_processes |
| 68 | |
| 69 | # the queues used by the modules for communication |
| 70 | manager = multiprocessing.Manager() |
| 71 | self._helper_queue = manager.Queue() |
| 72 | self._work_queue = manager.Queue() |
| 73 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 74 | def run(self): |
| 75 | """Busy pulling the next task from the queue for execution. |
| 76 | |
| 77 | Once a job is pulled, this stage invokes the actual stage method and submits |
| 78 | the result to the next pipeline stage. |
| 79 | |
| 80 | The process will terminate on receiving the poison pill from previous stage. |
| 81 | """ |
| 82 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 83 | # the worker pool |
Yuheng Long | 5fe6dc8 | 2013-06-18 17:05:08 -0700 | [diff] [blame] | 84 | work_pool = multiprocessing.Pool(self._num_processes) |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 85 | |
| 86 | # the helper process |
| 87 | helper_process = multiprocessing.Process(target=self._helper, |
| 88 | args=(self._cache, |
| 89 | self._helper_queue, |
| 90 | self._work_queue, |
| 91 | self._result_queue)) |
| 92 | helper_process.start() |
| 93 | mycache = self._cache.keys() |
| 94 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 95 | while True: |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 96 | task = self._task_queue.get() |
| 97 | if task == POISONPILL: |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 98 | # Poison pill means shutdown |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 99 | self._result_queue.put(POISONPILL) |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 100 | break |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 101 | |
| 102 | task_key = task.get_key(self._stage) |
| 103 | if task_key in mycache: |
| 104 | # The task has been encountered before. It will be sent to the helper |
| 105 | # module for further processing. |
| 106 | self._helper_queue.put(task) |
| 107 | else: |
| 108 | # Let the workers do the actual work. |
Yuheng Long | 5fe6dc8 | 2013-06-18 17:05:08 -0700 | [diff] [blame] | 109 | work_pool.apply_async(self._worker, args=(task, self._work_queue, |
Yuheng Long | 1dd70cd | 2013-06-19 09:44:24 -0700 | [diff] [blame] | 110 | self._result_queue)) |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 111 | mycache.append(task_key) |
| 112 | |
| 113 | # Shutdown the workers pool and the helper process. |
Yuheng Long | 5fe6dc8 | 2013-06-18 17:05:08 -0700 | [diff] [blame] | 114 | work_pool.close() |
| 115 | work_pool.join() |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 116 | |
| 117 | self._helper_queue.put(POISONPILL) |
| 118 | helper_process.join() |