blob: 837ed76aefc7f323bc0fdd614d6886e61947da5c [file] [log] [blame]
Yuheng Long49358b72013-07-10 14:45:29 -07001"""Pipeline Process unittest.
2
3Part of the Chrome build flags optimization.
4"""
Yuheng Longf20cffa2013-06-03 18:46:00 -07005
6__author__ = 'yuhenglong@google.com (Yuheng Long)'
7
Yuheng Longe610c192013-06-11 16:16:34 -07008import multiprocessing
Yuheng Longf20cffa2013-06-03 18:46:00 -07009import unittest
10
11import pipeline_process
12
Yuheng Longe610c192013-06-11 16:16:34 -070013# Pick an integer at random.
14ERROR = -334
15
16
17def MockHelper(done_dict, helper_queue, work_queue, result_queue):
18 """This method echos input to the output."""
19 while True:
20 if not helper_queue.empty():
21 task = helper_queue.get()
22 if task == pipeline_process.POISONPILL:
23 # Poison pill means shutdown
24 break
25
26 if task in done_dict:
27 # verify that it does not get duplicate "1"s in the test.
28 result_queue.put(ERROR)
29 else:
30 result_queue.put(('helper', task.get_key(0)))
31
32
33def MockWorker(task, buffer_queue, result_queue):
34 result_queue.put(('worker', task.get_key(0)))
35
36
37class MockTask(object):
38 def __init__(self, key):
39 self._key = key
40
41 def get_key(self, stage):
42 return self._key
43
Yuheng Longf20cffa2013-06-03 18:46:00 -070044
45class PipelineProcessTest(unittest.TestCase):
46 """This class test the PipelineProcess.
47
48 All the task inserted into the input queue should be taken out and hand to the
49 actual pipeline handler, except for the POISON_PILL. All these task should
50 also be passed to the next pipeline stage via the output queue.
51 """
52
53 def setUp(self):
54 pass
55
56 def testRun(self):
Yuheng Longe610c192013-06-11 16:16:34 -070057 """Test the run method.
Yuheng Longf20cffa2013-06-03 18:46:00 -070058
59 Ensure that all the tasks inserted into the queue are properly handled.
60 """
Yuheng Longe610c192013-06-11 16:16:34 -070061
62 manager = multiprocessing.Manager()
63 inp = manager.Queue()
64 output = manager.Queue()
65
66 process = pipeline_process.PipelineProcess(2, 'testing', {}, 'test', inp,
67 MockHelper, MockWorker, output)
68
69 process.start()
70 inp.put(MockTask(1))
71 inp.put(MockTask(1))
72 inp.put(MockTask(2))
73 inp.put(pipeline_process.POISONPILL)
74 process.join()
75
76 # All tasks are processed once and only once.
77 result = [('worker', 1), ('helper', 1), ('worker', 2),
78 pipeline_process.POISONPILL]
79 while result:
80 task = output.get()
81
82 # One "1"s is passed to the worker and one to the helper.
83 self.assertNotEqual(task, ERROR)
84
85 # The messages received should be exactly the same as the result.
86 self.assertTrue(task in result)
87 result.remove(task)
88
Yuheng Longf20cffa2013-06-03 18:46:00 -070089
90if __name__ == '__main__':
91 unittest.main()