blob: 29d88f521d7094191b53ab73ab77daf0e8d5a282 [file] [log] [blame]
Yuheng Longf20cffa2013-06-03 18:46:00 -07001"""Pipeline process that encapsulates the actual content.
2
Yuheng Longe610c192013-06-11 16:16:34 -07003The actual stages include the builder and the executor.
Yuheng Longf20cffa2013-06-03 18:46:00 -07004"""
5
6__author__ = 'yuhenglong@google.com (Yuheng Long)'
7
8import multiprocessing
9
Yuheng Longe610c192013-06-11 16:16:34 -070010# Pick an integer at random.
11POISONPILL = 975
12
Yuheng Longf20cffa2013-06-03 18:46:00 -070013
14class PipelineProcess(multiprocessing.Process):
Yuheng Longe610c192013-06-11 16:16:34 -070015 """A process that encapsulates the actual content pipeline stage.
Yuheng Longf20cffa2013-06-03 18:46:00 -070016
Yuheng Longe610c192013-06-11 16:16:34 -070017 The actual pipeline stage can be the builder or the tester. This process
18 continuously pull tasks from the queue until a poison pill is received.
Yuheng Longf20cffa2013-06-03 18:46:00 -070019 Once a job is received, it will hand it to the actual stage for processing.
Yuheng Longe610c192013-06-11 16:16:34 -070020
21 Each pipeline stage contains three modules.
22 The first module continuously pulls task from the input queue. It searches the
23 cache to check whether the task has encountered before. If so, duplicate
24 computation can be avoided.
25 The second module consists of a pool of workers that do the actual work, e.g.,
26 the worker will compile the source code and get the image in the builder
27 pipeline stage.
28 The third module is a helper that put the result cost to the cost field of the
29 duplicate tasks. For example, if two tasks are equivalent, only one task, say
30 t1 will be executed and the other task, say t2 will not be executed. The third
31 mode gets the result from t1, when it is available and set the cost of t2 to
32 be the same as that of t1.
Yuheng Longf20cffa2013-06-03 18:46:00 -070033 """
34
Yuheng Longe610c192013-06-11 16:16:34 -070035 def __init__(self, num_processes, name, cache, stage, task_queue, helper,
36 worker, result_queue):
Yuheng Longf20cffa2013-06-03 18:46:00 -070037 """Set up input/output queue and the actual method to be called.
38
39 Args:
Yuheng Longe610c192013-06-11 16:16:34 -070040 num_processes: Number of helpers subprocessors this stage has.
41 name: The name of this stage.
42 cache: The computed tasks encountered before.
43 stage: An int value that specifies the stage for this pipeline stage, for
44 example, build stage or test stage. This value will be used to retrieve
45 the keys in different stage. I.e., the flags set is the key in build
46 stage and the checksum is the key in the test stage. The key is used to
47 detect duplicates.
Yuheng Longf20cffa2013-06-03 18:46:00 -070048 task_queue: The input task queue for this pipeline stage.
Yuheng Longe610c192013-06-11 16:16:34 -070049 helper: The method hosted by the helper module to fill up the cost of the
50 duplicate tasks.
51 worker: The method hosted by the worker pools to do the actual work, e.g.,
52 compile the image.
Yuheng Longf20cffa2013-06-03 18:46:00 -070053 result_queue: The output task queue for this pipeline stage.
54 """
55
56 multiprocessing.Process.__init__(self)
Yuheng Longe610c192013-06-11 16:16:34 -070057
58 self._name = name
Yuheng Longf20cffa2013-06-03 18:46:00 -070059 self._task_queue = task_queue
60 self._result_queue = result_queue
61
Yuheng Longe610c192013-06-11 16:16:34 -070062 self._helper = helper
63 self._worker = worker
64
65 self._cache = cache
66 self._stage = stage
67 self._num_processes = num_processes
68
69 # the queues used by the modules for communication
70 manager = multiprocessing.Manager()
71 self._helper_queue = manager.Queue()
72 self._work_queue = manager.Queue()
73
Yuheng Longf20cffa2013-06-03 18:46:00 -070074 def run(self):
75 """Busy pulling the next task from the queue for execution.
76
77 Once a job is pulled, this stage invokes the actual stage method and submits
78 the result to the next pipeline stage.
79
80 The process will terminate on receiving the poison pill from previous stage.
81 """
82
Yuheng Longe610c192013-06-11 16:16:34 -070083 # the worker pool
Yuheng Long5fe6dc82013-06-18 17:05:08 -070084 work_pool = multiprocessing.Pool(self._num_processes)
Yuheng Longe610c192013-06-11 16:16:34 -070085
86 # the helper process
87 helper_process = multiprocessing.Process(target=self._helper,
88 args=(self._cache,
89 self._helper_queue,
90 self._work_queue,
91 self._result_queue))
92 helper_process.start()
93 mycache = self._cache.keys()
94
Yuheng Longf20cffa2013-06-03 18:46:00 -070095 while True:
Yuheng Longe610c192013-06-11 16:16:34 -070096 task = self._task_queue.get()
97 if task == POISONPILL:
Yuheng Longf20cffa2013-06-03 18:46:00 -070098 # Poison pill means shutdown
Yuheng Longe610c192013-06-11 16:16:34 -070099 self._result_queue.put(POISONPILL)
Yuheng Longf20cffa2013-06-03 18:46:00 -0700100 break
Yuheng Longe610c192013-06-11 16:16:34 -0700101
102 task_key = task.get_key(self._stage)
103 if task_key in mycache:
104 # The task has been encountered before. It will be sent to the helper
105 # module for further processing.
106 self._helper_queue.put(task)
107 else:
108 # Let the workers do the actual work.
Yuheng Long5fe6dc82013-06-18 17:05:08 -0700109 work_pool.apply_async(self._worker, args=(task, self._work_queue,
Yuheng Long1dd70cd2013-06-19 09:44:24 -0700110 self._result_queue))
Yuheng Longe610c192013-06-11 16:16:34 -0700111 mycache.append(task_key)
112
113 # Shutdown the workers pool and the helper process.
Yuheng Long5fe6dc82013-06-18 17:05:08 -0700114 work_pool.close()
115 work_pool.join()
Yuheng Longe610c192013-06-11 16:16:34 -0700116
117 self._helper_queue.put(POISONPILL)
118 helper_process.join()