blob: c375dc280805725d923ec5964212615a5a977a90 [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for task_executor unittests."""
6# pylint: disable=g-bad-import-order
7
8import mock
9import os
10import sys
11import unittest
12
13import task_executor
14
15from google.appengine.api import taskqueue
16from google.appengine.ext import testbed
17
18
19class FakeSwarmingLib(object):
20
21 def __init__(self, success_num=sys.maxint):
22 self.success_num = success_num
Xixuan Wua5a29442017-10-11 11:03:02 -070023 self.dummy_run_count = 0
Xixuan Wu835dee22017-09-07 10:47:29 -070024
25 def run(self, **suite_kwargs):
26 if int(suite_kwargs.get('num', 0)) > self.success_num:
27 raise ValueError('test')
28
Xixuan Wua5a29442017-10-11 11:03:02 -070029 def dummy_run(self):
30 self.dummy_run_count += 1
31
Xixuan Wu835dee22017-09-07 10:47:29 -070032
33class TaskExecutorTestCase(unittest.TestCase):
34
35 def setUp(self):
36 self.testbed = testbed.Testbed()
37 self.testbed.activate()
38 self.addCleanup(self.testbed.deactivate)
39
40 # root_path must be set the location of queue.yaml.
41 # Otherwise, only the 'default' queue will be available.
42 self.testbed.init_taskqueue_stub(
43 root_path=os.path.join(os.path.dirname(__file__)))
44 self.taskqueue_stub = self.testbed.get_stub(
45 testbed.TASKQUEUE_SERVICE_NAME)
46
47 def testPushTask(self):
48 suite_kwargs = {'suite': 'fake_suite'}
49 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
50 tasks = self.taskqueue_stub.get_filtered_tasks()
51 self.assertEqual(len(tasks), 1)
52 self.assertEqual(suite_kwargs, tasks[0].extract_params())
53
Xixuan Wua5a29442017-10-11 11:03:02 -070054 def testBatchExecuteTaskWithGAETesting(self):
55 """Test task_executor execute tasks in batch in testing on GAE."""
56 suite_kwargs = {'suite': 'fake_suite'}
57 for i in range(task_executor.BATCH_SIZE):
58 suite_kwargs['num'] = i + 1
59 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
60
61 with mock.patch('swarming_lib.SwarmingRunner',
62 return_value=FakeSwarmingLib()):
63 with mock.patch('global_config.GAE_TESTING', return_value=True):
64 task_processor = task_executor.TaskProcessor(task_executor.SUITES_QUEUE)
65 task_processor.batch_execute()
66 self.assertEqual(task_executor.BATCH_SIZE,
67 task_processor.swarming.dummy_run_count)
68
Xixuan Wu835dee22017-09-07 10:47:29 -070069 def testBatchExecuteTaskSuccessfully(self):
70 """Test task_executor successfully execute tasks in batch."""
71 suite_kwargs = {'suite': 'fake_suite'}
72 extra_num = 10
73 for i in range(task_executor.BATCH_SIZE + extra_num):
74 suite_kwargs['num'] = i + 1
75 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
76
77 # Before batch_execute
78 tasks = self.taskqueue_stub.get_filtered_tasks()
79 self.assertEqual(len(tasks), task_executor.BATCH_SIZE + extra_num)
80 with mock.patch('swarming_lib.SwarmingRunner',
81 return_value=FakeSwarmingLib()):
82 task_processor = task_executor.TaskProcessor(task_executor.SUITES_QUEUE)
83 task_processor.batch_execute()
84 # After batch_execute succeeds, only extra_num tasks left.
85 tasks = self.taskqueue_stub.get_filtered_tasks()
86 self.assertEqual(len(tasks), extra_num)
87
88 def testBatchExecuteTaskFailedSwarmingTotally(self):
89 """Test task_executor fails at the beginning, and no tasks are deleted."""
90 suite_kwargs = {'suite': 'fake_suite'}
91 extra_num = 10
92 for i in range(task_executor.BATCH_SIZE + extra_num):
93 suite_kwargs['num'] = i + 1
94 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
95
96 # Before batch_execute
97 tasks = self.taskqueue_stub.get_filtered_tasks()
98 self.assertEqual(len(tasks), task_executor.BATCH_SIZE + extra_num)
99 with mock.patch('swarming_lib.SwarmingRunner',
100 return_value=FakeSwarmingLib(0)):
101 task_processor = task_executor.TaskProcessor(task_executor.SUITES_QUEUE)
102 self.assertRaises(ValueError, task_processor.batch_execute)
103 # After batch_execute fails, no tasks are deleted from task queue.
104 tasks = self.taskqueue_stub.get_filtered_tasks()
105 self.assertEqual(len(tasks), task_executor.BATCH_SIZE + extra_num)
106
107 def testBatchExecuteTaskFailedSwarmingPartially(self):
108 """Test task_executor fails halfway, and only executed tasks are deleted."""
109 suite_kwargs = {'suite': 'fake_suite'}
110 extra_num = 10
111 success_num = 50
112 for i in range(task_executor.BATCH_SIZE + extra_num):
113 suite_kwargs['num'] = i + 1
114 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
115
116 # Before batch_execute
117 tasks = self.taskqueue_stub.get_filtered_tasks()
118 self.assertEqual(len(tasks), task_executor.BATCH_SIZE + extra_num)
119 with mock.patch('swarming_lib.SwarmingRunner',
120 return_value=FakeSwarmingLib(success_num)):
121 task_processor = task_executor.TaskProcessor(task_executor.SUITES_QUEUE)
122 self.assertRaises(ValueError, task_processor.batch_execute)
123 # After batch_execute fails, no tasks are deleted from task queue.
124 tasks = self.taskqueue_stub.get_filtered_tasks()
125 self.assertEqual(len(tasks),
126 task_executor.BATCH_SIZE - success_num + extra_num)
127
128 def testBatchExecuteTaskFailedLeasing(self):
129 """Test task_executor fails to lease task."""
130 suite_kwargs = {'suite': 'fake_suite'}
131 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
132
133 with mock.patch('swarming_lib.SwarmingRunner',
134 return_value=FakeSwarmingLib(False)):
135 task_processor = task_executor.TaskProcessor('nonExistentQueue')
136 self.assertRaises(taskqueue.UnknownQueueError,
137 task_processor.batch_execute)
138 # After batch_execute fails, no tasks are deleted from task queue.
139 tasks = self.taskqueue_stub.get_filtered_tasks()
140 self.assertEqual(len(tasks), 1)
141
142
143if __name__ == '__main__':
144 unittest.main()