Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 1 | """Pipeline process that encapsulates the actual content. |
| 2 | |
Yuheng Long | 49358b7 | 2013-07-10 14:45:29 -0700 | [diff] [blame^] | 3 | Part of the Chrome build flags optimization. |
| 4 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 5 | The actual stages include the builder and the executor. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 6 | """ |
| 7 | |
| 8 | __author__ = 'yuhenglong@google.com (Yuheng Long)' |
| 9 | |
| 10 | import multiprocessing |
| 11 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 12 | # Pick an integer at random. |
| 13 | POISONPILL = 975 |
| 14 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 15 | |
| 16 | class PipelineProcess(multiprocessing.Process): |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 17 | """A process that encapsulates the actual content pipeline stage. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 18 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 19 | The actual pipeline stage can be the builder or the tester. This process |
| 20 | continuously pull tasks from the queue until a poison pill is received. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 21 | Once a job is received, it will hand it to the actual stage for processing. |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 22 | |
| 23 | Each pipeline stage contains three modules. |
| 24 | The first module continuously pulls task from the input queue. It searches the |
| 25 | cache to check whether the task has encountered before. If so, duplicate |
| 26 | computation can be avoided. |
| 27 | The second module consists of a pool of workers that do the actual work, e.g., |
| 28 | the worker will compile the source code and get the image in the builder |
| 29 | pipeline stage. |
| 30 | The third module is a helper that put the result cost to the cost field of the |
| 31 | duplicate tasks. For example, if two tasks are equivalent, only one task, say |
| 32 | t1 will be executed and the other task, say t2 will not be executed. The third |
| 33 | mode gets the result from t1, when it is available and set the cost of t2 to |
| 34 | be the same as that of t1. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 35 | """ |
| 36 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 37 | def __init__(self, num_processes, name, cache, stage, task_queue, helper, |
| 38 | worker, result_queue): |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 39 | """Set up input/output queue and the actual method to be called. |
| 40 | |
| 41 | Args: |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 42 | num_processes: Number of helpers subprocessors this stage has. |
| 43 | name: The name of this stage. |
| 44 | cache: The computed tasks encountered before. |
| 45 | stage: An int value that specifies the stage for this pipeline stage, for |
| 46 | example, build stage or test stage. This value will be used to retrieve |
| 47 | the keys in different stage. I.e., the flags set is the key in build |
| 48 | stage and the checksum is the key in the test stage. The key is used to |
| 49 | detect duplicates. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 50 | task_queue: The input task queue for this pipeline stage. |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 51 | helper: The method hosted by the helper module to fill up the cost of the |
| 52 | duplicate tasks. |
| 53 | worker: The method hosted by the worker pools to do the actual work, e.g., |
| 54 | compile the image. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 55 | result_queue: The output task queue for this pipeline stage. |
| 56 | """ |
| 57 | |
| 58 | multiprocessing.Process.__init__(self) |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 59 | |
| 60 | self._name = name |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 61 | self._task_queue = task_queue |
| 62 | self._result_queue = result_queue |
| 63 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 64 | self._helper = helper |
| 65 | self._worker = worker |
| 66 | |
| 67 | self._cache = cache |
| 68 | self._stage = stage |
| 69 | self._num_processes = num_processes |
| 70 | |
| 71 | # the queues used by the modules for communication |
| 72 | manager = multiprocessing.Manager() |
| 73 | self._helper_queue = manager.Queue() |
| 74 | self._work_queue = manager.Queue() |
| 75 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 76 | def run(self): |
| 77 | """Busy pulling the next task from the queue for execution. |
| 78 | |
| 79 | Once a job is pulled, this stage invokes the actual stage method and submits |
| 80 | the result to the next pipeline stage. |
| 81 | |
| 82 | The process will terminate on receiving the poison pill from previous stage. |
| 83 | """ |
| 84 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 85 | # the worker pool |
Yuheng Long | 5fe6dc8 | 2013-06-18 17:05:08 -0700 | [diff] [blame] | 86 | work_pool = multiprocessing.Pool(self._num_processes) |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 87 | |
| 88 | # the helper process |
| 89 | helper_process = multiprocessing.Process(target=self._helper, |
| 90 | args=(self._cache, |
| 91 | self._helper_queue, |
| 92 | self._work_queue, |
| 93 | self._result_queue)) |
| 94 | helper_process.start() |
| 95 | mycache = self._cache.keys() |
| 96 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 97 | while True: |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 98 | task = self._task_queue.get() |
| 99 | if task == POISONPILL: |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 100 | # Poison pill means shutdown |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 101 | self._result_queue.put(POISONPILL) |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 102 | break |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 103 | |
| 104 | task_key = task.get_key(self._stage) |
| 105 | if task_key in mycache: |
| 106 | # The task has been encountered before. It will be sent to the helper |
| 107 | # module for further processing. |
| 108 | self._helper_queue.put(task) |
| 109 | else: |
| 110 | # Let the workers do the actual work. |
Yuheng Long | 5fe6dc8 | 2013-06-18 17:05:08 -0700 | [diff] [blame] | 111 | work_pool.apply_async(self._worker, args=(task, self._work_queue, |
Yuheng Long | 1dd70cd | 2013-06-19 09:44:24 -0700 | [diff] [blame] | 112 | self._result_queue)) |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 113 | mycache.append(task_key) |
| 114 | |
| 115 | # Shutdown the workers pool and the helper process. |
Yuheng Long | 5fe6dc8 | 2013-06-18 17:05:08 -0700 | [diff] [blame] | 116 | work_pool.close() |
| 117 | work_pool.join() |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 118 | |
| 119 | self._helper_queue.put(POISONPILL) |
| 120 | helper_process.join() |