Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 1 | """Pipeline Process unittest.""" |
| 2 | |
| 3 | __author__ = 'yuhenglong@google.com (Yuheng Long)' |
| 4 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame^] | 5 | import multiprocessing |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 6 | import unittest |
| 7 | |
| 8 | import pipeline_process |
| 9 | |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame^] | 10 | # Pick an integer at random. |
| 11 | ERROR = -334 |
| 12 | |
| 13 | |
| 14 | def MockHelper(done_dict, helper_queue, work_queue, result_queue): |
| 15 | """This method echos input to the output.""" |
| 16 | while True: |
| 17 | if not helper_queue.empty(): |
| 18 | task = helper_queue.get() |
| 19 | if task == pipeline_process.POISONPILL: |
| 20 | # Poison pill means shutdown |
| 21 | break |
| 22 | |
| 23 | if task in done_dict: |
| 24 | # verify that it does not get duplicate "1"s in the test. |
| 25 | result_queue.put(ERROR) |
| 26 | else: |
| 27 | result_queue.put(('helper', task.get_key(0))) |
| 28 | |
| 29 | |
| 30 | def MockWorker(task, buffer_queue, result_queue): |
| 31 | result_queue.put(('worker', task.get_key(0))) |
| 32 | |
| 33 | |
| 34 | class MockTask(object): |
| 35 | def __init__(self, key): |
| 36 | self._key = key |
| 37 | |
| 38 | def get_key(self, stage): |
| 39 | return self._key |
| 40 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 41 | |
| 42 | class PipelineProcessTest(unittest.TestCase): |
| 43 | """This class test the PipelineProcess. |
| 44 | |
| 45 | All the task inserted into the input queue should be taken out and hand to the |
| 46 | actual pipeline handler, except for the POISON_PILL. All these task should |
| 47 | also be passed to the next pipeline stage via the output queue. |
| 48 | """ |
| 49 | |
| 50 | def setUp(self): |
| 51 | pass |
| 52 | |
| 53 | def testRun(self): |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame^] | 54 | """Test the run method. |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 55 | |
| 56 | Ensure that all the tasks inserted into the queue are properly handled. |
| 57 | """ |
Yuheng Long | e610c19 | 2013-06-11 16:16:34 -0700 | [diff] [blame^] | 58 | |
| 59 | manager = multiprocessing.Manager() |
| 60 | inp = manager.Queue() |
| 61 | output = manager.Queue() |
| 62 | |
| 63 | process = pipeline_process.PipelineProcess(2, 'testing', {}, 'test', inp, |
| 64 | MockHelper, MockWorker, output) |
| 65 | |
| 66 | process.start() |
| 67 | inp.put(MockTask(1)) |
| 68 | inp.put(MockTask(1)) |
| 69 | inp.put(MockTask(2)) |
| 70 | inp.put(pipeline_process.POISONPILL) |
| 71 | process.join() |
| 72 | |
| 73 | # All tasks are processed once and only once. |
| 74 | result = [('worker', 1), ('helper', 1), ('worker', 2), |
| 75 | pipeline_process.POISONPILL] |
| 76 | while result: |
| 77 | task = output.get() |
| 78 | |
| 79 | # One "1"s is passed to the worker and one to the helper. |
| 80 | self.assertNotEqual(task, ERROR) |
| 81 | |
| 82 | # The messages received should be exactly the same as the result. |
| 83 | self.assertTrue(task in result) |
| 84 | result.remove(task) |
| 85 | |
Yuheng Long | f20cffa | 2013-06-03 18:46:00 -0700 | [diff] [blame] | 86 | |
| 87 | if __name__ == '__main__': |
| 88 | unittest.main() |