blob: 32bbaa99bdf6c14ddcecc30e43db9526944bfccb [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for task_executor unittests."""
6# pylint: disable=g-bad-import-order
7
8import mock
9import os
10import sys
11import unittest
12
13import task_executor
14
15from google.appengine.api import taskqueue
16from google.appengine.ext import testbed
17
18
19class FakeSwarmingLib(object):
20
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070021 def __init__(self, success_num=sys.maxint, error_num=0):
Xixuan Wu835dee22017-09-07 10:47:29 -070022 self.success_num = success_num
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070023 self.error_num = error_num
Xixuan Wua5a29442017-10-11 11:03:02 -070024 self.dummy_run_count = 0
Xixuan Wu835dee22017-09-07 10:47:29 -070025
26 def run(self, **suite_kwargs):
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070027 num = int(suite_kwargs.get('num', 0))
28 if num > self.success_num and num <= self.success_num + self.error_num:
Xixuan Wu835dee22017-09-07 10:47:29 -070029 raise ValueError('test')
30
Xixuan Wua5a29442017-10-11 11:03:02 -070031 def dummy_run(self):
32 self.dummy_run_count += 1
33
Xixuan Wu835dee22017-09-07 10:47:29 -070034
35class TaskExecutorTestCase(unittest.TestCase):
36
37 def setUp(self):
38 self.testbed = testbed.Testbed()
39 self.testbed.activate()
40 self.addCleanup(self.testbed.deactivate)
41
42 # root_path must be set the location of queue.yaml.
43 # Otherwise, only the 'default' queue will be available.
44 self.testbed.init_taskqueue_stub(
45 root_path=os.path.join(os.path.dirname(__file__)))
46 self.taskqueue_stub = self.testbed.get_stub(
47 testbed.TASKQUEUE_SERVICE_NAME)
48
49 def testPushTask(self):
50 suite_kwargs = {'suite': 'fake_suite'}
51 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
52 tasks = self.taskqueue_stub.get_filtered_tasks()
53 self.assertEqual(len(tasks), 1)
54 self.assertEqual(suite_kwargs, tasks[0].extract_params())
55
Xixuan Wua5a29442017-10-11 11:03:02 -070056 def testBatchExecuteTaskWithGAETesting(self):
57 """Test task_executor execute tasks in batch in testing on GAE."""
58 suite_kwargs = {'suite': 'fake_suite'}
59 for i in range(task_executor.BATCH_SIZE):
60 suite_kwargs['num'] = i + 1
61 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
62
63 with mock.patch('swarming_lib.SwarmingRunner',
64 return_value=FakeSwarmingLib()):
65 with mock.patch('global_config.GAE_TESTING', return_value=True):
Craig Bergstrom3212fb42018-04-17 19:24:15 -060066 task_processor = task_executor.TaskProcessor(task_executor.SUITES_QUEUE,
67 'invalidhostname')
Xixuan Wua5a29442017-10-11 11:03:02 -070068 task_processor.batch_execute()
69 self.assertEqual(task_executor.BATCH_SIZE,
70 task_processor.swarming.dummy_run_count)
71
Xixuan Wu835dee22017-09-07 10:47:29 -070072 def testBatchExecuteTaskSuccessfully(self):
73 """Test task_executor successfully execute tasks in batch."""
74 suite_kwargs = {'suite': 'fake_suite'}
75 extra_num = 10
76 for i in range(task_executor.BATCH_SIZE + extra_num):
77 suite_kwargs['num'] = i + 1
78 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
79
80 # Before batch_execute
81 tasks = self.taskqueue_stub.get_filtered_tasks()
82 self.assertEqual(len(tasks), task_executor.BATCH_SIZE + extra_num)
83 with mock.patch('swarming_lib.SwarmingRunner',
84 return_value=FakeSwarmingLib()):
Craig Bergstrom3212fb42018-04-17 19:24:15 -060085 task_processor = task_executor.TaskProcessor(task_executor.SUITES_QUEUE,
86 'invalidhostname')
Xixuan Wu835dee22017-09-07 10:47:29 -070087 task_processor.batch_execute()
88 # After batch_execute succeeds, only extra_num tasks left.
89 tasks = self.taskqueue_stub.get_filtered_tasks()
90 self.assertEqual(len(tasks), extra_num)
91
92 def testBatchExecuteTaskFailedSwarmingTotally(self):
93 """Test task_executor fails at the beginning, and no tasks are deleted."""
94 suite_kwargs = {'suite': 'fake_suite'}
95 extra_num = 10
96 for i in range(task_executor.BATCH_SIZE + extra_num):
97 suite_kwargs['num'] = i + 1
98 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
99
100 # Before batch_execute
101 tasks = self.taskqueue_stub.get_filtered_tasks()
102 self.assertEqual(len(tasks), task_executor.BATCH_SIZE + extra_num)
103 with mock.patch('swarming_lib.SwarmingRunner',
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -0700104 return_value=FakeSwarmingLib(0, error_num=len(tasks))):
Craig Bergstrom3212fb42018-04-17 19:24:15 -0600105 task_processor = task_executor.TaskProcessor(task_executor.SUITES_QUEUE,
106 'invalidhostname')
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -0700107 task_processor.batch_execute()
108 # After batch_execute, no tasks are deleted from task queue, due
109 # to they're all failed to kick off.
Xixuan Wu835dee22017-09-07 10:47:29 -0700110 tasks = self.taskqueue_stub.get_filtered_tasks()
111 self.assertEqual(len(tasks), task_executor.BATCH_SIZE + extra_num)
112
113 def testBatchExecuteTaskFailedSwarmingPartially(self):
114 """Test task_executor fails halfway, and only executed tasks are deleted."""
115 suite_kwargs = {'suite': 'fake_suite'}
116 extra_num = 10
117 success_num = 50
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -0700118 error_num = 5
Xixuan Wu835dee22017-09-07 10:47:29 -0700119 for i in range(task_executor.BATCH_SIZE + extra_num):
120 suite_kwargs['num'] = i + 1
121 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
122
123 # Before batch_execute
124 tasks = self.taskqueue_stub.get_filtered_tasks()
125 self.assertEqual(len(tasks), task_executor.BATCH_SIZE + extra_num)
126 with mock.patch('swarming_lib.SwarmingRunner',
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -0700127 return_value=FakeSwarmingLib(success_num, error_num)):
Craig Bergstrom3212fb42018-04-17 19:24:15 -0600128 task_processor = task_executor.TaskProcessor(task_executor.SUITES_QUEUE,
129 'invalidhostname')
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -0700130 task_processor.batch_execute()
131 # After batch_execute, only failed suites and extra suites are
132 # kept in task queue.
Xixuan Wu835dee22017-09-07 10:47:29 -0700133 tasks = self.taskqueue_stub.get_filtered_tasks()
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -0700134 self.assertEqual(len(tasks), error_num + extra_num)
Xixuan Wu835dee22017-09-07 10:47:29 -0700135
136 def testBatchExecuteTaskFailedLeasing(self):
137 """Test task_executor fails to lease task."""
138 suite_kwargs = {'suite': 'fake_suite'}
139 task_executor.push(task_executor.SUITES_QUEUE, **suite_kwargs)
140
141 with mock.patch('swarming_lib.SwarmingRunner',
142 return_value=FakeSwarmingLib(False)):
Craig Bergstrom3212fb42018-04-17 19:24:15 -0600143 task_processor = task_executor.TaskProcessor('nonExistentQueue',
144 'invalidhostname')
Xixuan Wu835dee22017-09-07 10:47:29 -0700145 self.assertRaises(taskqueue.UnknownQueueError,
146 task_processor.batch_execute)
147 # After batch_execute fails, no tasks are deleted from task queue.
148 tasks = self.taskqueue_stub.get_filtered_tasks()
149 self.assertEqual(len(tasks), 1)
150
151
152if __name__ == '__main__':
153 unittest.main()