blob: e40b5a21b47d9fa9732e70712615c5afbd2b79c0 [file] [log] [blame]
Yuheng Longf20cffa2013-06-03 18:46:00 -07001"""Pipeline Process unittest."""
2
3__author__ = 'yuhenglong@google.com (Yuheng Long)'
4
Yuheng Longe610c192013-06-11 16:16:34 -07005import multiprocessing
Yuheng Longf20cffa2013-06-03 18:46:00 -07006import unittest
7
8import pipeline_process
9
Yuheng Longe610c192013-06-11 16:16:34 -070010# Pick an integer at random.
11ERROR = -334
12
13
14def MockHelper(done_dict, helper_queue, work_queue, result_queue):
15 """This method echos input to the output."""
16 while True:
17 if not helper_queue.empty():
18 task = helper_queue.get()
19 if task == pipeline_process.POISONPILL:
20 # Poison pill means shutdown
21 break
22
23 if task in done_dict:
24 # verify that it does not get duplicate "1"s in the test.
25 result_queue.put(ERROR)
26 else:
27 result_queue.put(('helper', task.get_key(0)))
28
29
30def MockWorker(task, buffer_queue, result_queue):
31 result_queue.put(('worker', task.get_key(0)))
32
33
34class MockTask(object):
35 def __init__(self, key):
36 self._key = key
37
38 def get_key(self, stage):
39 return self._key
40
Yuheng Longf20cffa2013-06-03 18:46:00 -070041
42class PipelineProcessTest(unittest.TestCase):
43 """This class test the PipelineProcess.
44
45 All the task inserted into the input queue should be taken out and hand to the
46 actual pipeline handler, except for the POISON_PILL. All these task should
47 also be passed to the next pipeline stage via the output queue.
48 """
49
50 def setUp(self):
51 pass
52
53 def testRun(self):
Yuheng Longe610c192013-06-11 16:16:34 -070054 """Test the run method.
Yuheng Longf20cffa2013-06-03 18:46:00 -070055
56 Ensure that all the tasks inserted into the queue are properly handled.
57 """
Yuheng Longe610c192013-06-11 16:16:34 -070058
59 manager = multiprocessing.Manager()
60 inp = manager.Queue()
61 output = manager.Queue()
62
63 process = pipeline_process.PipelineProcess(2, 'testing', {}, 'test', inp,
64 MockHelper, MockWorker, output)
65
66 process.start()
67 inp.put(MockTask(1))
68 inp.put(MockTask(1))
69 inp.put(MockTask(2))
70 inp.put(pipeline_process.POISONPILL)
71 process.join()
72
73 # All tasks are processed once and only once.
74 result = [('worker', 1), ('helper', 1), ('worker', 2),
75 pipeline_process.POISONPILL]
76 while result:
77 task = output.get()
78
79 # One "1"s is passed to the worker and one to the helper.
80 self.assertNotEqual(task, ERROR)
81
82 # The messages received should be exactly the same as the result.
83 self.assertTrue(task in result)
84 result.remove(task)
85
Yuheng Longf20cffa2013-06-03 18:46:00 -070086
87if __name__ == '__main__':
88 unittest.main()