Yuheng Long | 49358b7 | 2013-07-10 14:45:29 -0700 | [diff] [blame^] | 1 | """Pipeline Process unittest. |
| 2 | |
| 3 | Part of the Chrome build flags optimization. |
| 4 | """ |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 5 | |
| 6 | __author__ = 'yuhenglong@google.com (Yuheng Long)' |
| 7 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 8 | import multiprocessing |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 9 | import unittest |
| 10 | |
| 11 | import pipeline_process |
| 12 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 13 | # Pick an integer at random. |
| 14 | ERROR = -334 |
| 15 | |
| 16 | |
| 17 | def MockHelper(done_dict, helper_queue, work_queue, result_queue): |
| 18 | """This method echos input to the output.""" |
| 19 | while True: |
| 20 | if not helper_queue.empty(): |
| 21 | task = helper_queue.get() |
| 22 | if task == pipeline_process.POISONPILL: |
| 23 | # Poison pill means shutdown |
| 24 | break |
| 25 | |
| 26 | if task in done_dict: |
| 27 | # verify that it does not get duplicate "1"s in the test. |
| 28 | result_queue.put(ERROR) |
| 29 | else: |
| 30 | result_queue.put(('helper', task.get_key(0))) |
| 31 | |
| 32 | |
| 33 | def MockWorker(task, buffer_queue, result_queue): |
| 34 | result_queue.put(('worker', task.get_key(0))) |
| 35 | |
| 36 | |
| 37 | class MockTask(object): |
| 38 | def __init__(self, key): |
| 39 | self._key = key |
| 40 | |
| 41 | def get_key(self, stage): |
| 42 | return self._key |
| 43 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 44 | |
| 45 | class PipelineProcessTest(unittest.TestCase): |
| 46 | """This class test the PipelineProcess. |
| 47 | |
| 48 | All the task inserted into the input queue should be taken out and hand to the |
| 49 | actual pipeline handler, except for the POISON_PILL. All these task should |
| 50 | also be passed to the next pipeline stage via the output queue. |
| 51 | """ |
| 52 | |
| 53 | def setUp(self): |
| 54 | pass |
| 55 | |
| 56 | def testRun(self): |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 57 | """Test the run method. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 58 | |
| 59 | Ensure that all the tasks inserted into the queue are properly handled. |
| 60 | """ |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame] | 61 | |
| 62 | manager = multiprocessing.Manager() |
| 63 | inp = manager.Queue() |
| 64 | output = manager.Queue() |
| 65 | |
| 66 | process = pipeline_process.PipelineProcess(2, 'testing', {}, 'test', inp, |
| 67 | MockHelper, MockWorker, output) |
| 68 | |
| 69 | process.start() |
| 70 | inp.put(MockTask(1)) |
| 71 | inp.put(MockTask(1)) |
| 72 | inp.put(MockTask(2)) |
| 73 | inp.put(pipeline_process.POISONPILL) |
| 74 | process.join() |
| 75 | |
| 76 | # All tasks are processed once and only once. |
| 77 | result = [('worker', 1), ('helper', 1), ('worker', 2), |
| 78 | pipeline_process.POISONPILL] |
| 79 | while result: |
| 80 | task = output.get() |
| 81 | |
| 82 | # One "1"s is passed to the worker and one to the helper. |
| 83 | self.assertNotEqual(task, ERROR) |
| 84 | |
| 85 | # The messages received should be exactly the same as the result. |
| 86 | self.assertTrue(task in result) |
| 87 | result.remove(task) |
| 88 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 89 | |
| 90 | if __name__ == '__main__': |
| 91 | unittest.main() |