blob: c5a7b8bbc0c421a6fcb68f6d7a1bf46e04dbb690 [file] [log] [blame]
Yuheng Longf20cffa2013-06-03 18:46:00 -07001"""Pipeline process that encapsulates the actual content.
2
Yuheng Long49358b72013-07-10 14:45:29 -07003Part of the Chrome build flags optimization.
4
Yuheng Longe610c192013-06-11 16:16:34 -07005The actual stages include the builder and the executor.
Yuheng Longf20cffa2013-06-03 18:46:00 -07006"""
7
8__author__ = 'yuhenglong@google.com (Yuheng Long)'
9
10import multiprocessing
11
Yuheng Longe610c192013-06-11 16:16:34 -070012# Pick an integer at random.
13POISONPILL = 975
14
Yuheng Longf20cffa2013-06-03 18:46:00 -070015
16class PipelineProcess(multiprocessing.Process):
Yuheng Longe610c192013-06-11 16:16:34 -070017 """A process that encapsulates the actual content pipeline stage.
Yuheng Longf20cffa2013-06-03 18:46:00 -070018
Yuheng Longe610c192013-06-11 16:16:34 -070019 The actual pipeline stage can be the builder or the tester. This process
20 continuously pull tasks from the queue until a poison pill is received.
Yuheng Longf20cffa2013-06-03 18:46:00 -070021 Once a job is received, it will hand it to the actual stage for processing.
Yuheng Longe610c192013-06-11 16:16:34 -070022
23 Each pipeline stage contains three modules.
24 The first module continuously pulls task from the input queue. It searches the
25 cache to check whether the task has encountered before. If so, duplicate
26 computation can be avoided.
27 The second module consists of a pool of workers that do the actual work, e.g.,
28 the worker will compile the source code and get the image in the builder
29 pipeline stage.
30 The third module is a helper that put the result cost to the cost field of the
31 duplicate tasks. For example, if two tasks are equivalent, only one task, say
32 t1 will be executed and the other task, say t2 will not be executed. The third
33 mode gets the result from t1, when it is available and set the cost of t2 to
34 be the same as that of t1.
Yuheng Longf20cffa2013-06-03 18:46:00 -070035 """
36
Yuheng Longe610c192013-06-11 16:16:34 -070037 def __init__(self, num_processes, name, cache, stage, task_queue, helper,
38 worker, result_queue):
Yuheng Longf20cffa2013-06-03 18:46:00 -070039 """Set up input/output queue and the actual method to be called.
40
41 Args:
Yuheng Longe610c192013-06-11 16:16:34 -070042 num_processes: Number of helpers subprocessors this stage has.
43 name: The name of this stage.
44 cache: The computed tasks encountered before.
45 stage: An int value that specifies the stage for this pipeline stage, for
46 example, build stage or test stage. This value will be used to retrieve
47 the keys in different stage. I.e., the flags set is the key in build
48 stage and the checksum is the key in the test stage. The key is used to
49 detect duplicates.
Yuheng Longf20cffa2013-06-03 18:46:00 -070050 task_queue: The input task queue for this pipeline stage.
Yuheng Longe610c192013-06-11 16:16:34 -070051 helper: The method hosted by the helper module to fill up the cost of the
52 duplicate tasks.
53 worker: The method hosted by the worker pools to do the actual work, e.g.,
54 compile the image.
Yuheng Longf20cffa2013-06-03 18:46:00 -070055 result_queue: The output task queue for this pipeline stage.
56 """
57
58 multiprocessing.Process.__init__(self)
Yuheng Longe610c192013-06-11 16:16:34 -070059
60 self._name = name
Yuheng Longf20cffa2013-06-03 18:46:00 -070061 self._task_queue = task_queue
62 self._result_queue = result_queue
63
Yuheng Longe610c192013-06-11 16:16:34 -070064 self._helper = helper
65 self._worker = worker
66
67 self._cache = cache
68 self._stage = stage
69 self._num_processes = num_processes
70
71 # the queues used by the modules for communication
72 manager = multiprocessing.Manager()
73 self._helper_queue = manager.Queue()
74 self._work_queue = manager.Queue()
75
Yuheng Longf20cffa2013-06-03 18:46:00 -070076 def run(self):
77 """Busy pulling the next task from the queue for execution.
78
79 Once a job is pulled, this stage invokes the actual stage method and submits
80 the result to the next pipeline stage.
81
82 The process will terminate on receiving the poison pill from previous stage.
83 """
84
Yuheng Longe610c192013-06-11 16:16:34 -070085 # the worker pool
Yuheng Long5fe6dc82013-06-18 17:05:08 -070086 work_pool = multiprocessing.Pool(self._num_processes)
Yuheng Longe610c192013-06-11 16:16:34 -070087
88 # the helper process
89 helper_process = multiprocessing.Process(target=self._helper,
90 args=(self._cache,
91 self._helper_queue,
92 self._work_queue,
93 self._result_queue))
94 helper_process.start()
95 mycache = self._cache.keys()
96
Yuheng Longf20cffa2013-06-03 18:46:00 -070097 while True:
Yuheng Longe610c192013-06-11 16:16:34 -070098 task = self._task_queue.get()
99 if task == POISONPILL:
Yuheng Longf20cffa2013-06-03 18:46:00 -0700100 # Poison pill means shutdown
Yuheng Longe610c192013-06-11 16:16:34 -0700101 self._result_queue.put(POISONPILL)
Yuheng Longf20cffa2013-06-03 18:46:00 -0700102 break
Yuheng Longe610c192013-06-11 16:16:34 -0700103
104 task_key = task.get_key(self._stage)
105 if task_key in mycache:
106 # The task has been encountered before. It will be sent to the helper
107 # module for further processing.
108 self._helper_queue.put(task)
109 else:
110 # Let the workers do the actual work.
Yuheng Long5fe6dc82013-06-18 17:05:08 -0700111 work_pool.apply_async(self._worker, args=(task, self._work_queue,
Yuheng Long1dd70cd2013-06-19 09:44:24 -0700112 self._result_queue))
Yuheng Longe610c192013-06-11 16:16:34 -0700113 mycache.append(task_key)
114
115 # Shutdown the workers pool and the helper process.
Yuheng Long5fe6dc82013-06-18 17:05:08 -0700116 work_pool.close()
117 work_pool.join()
Yuheng Longe610c192013-06-11 16:16:34 -0700118
119 self._helper_queue.put(POISONPILL)
120 helper_process.join()