blob: 32c6ddc3e0a59db651ccb035e1002a6e5e76704e [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for executing tasks queued by suite scheduler."""
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -07006# pylint: disable=g-bad-import-order
Xixuan Wu835dee22017-09-07 10:47:29 -07007
Prathmesh Prabhue8182312020-03-06 23:33:07 -08008import collections
Xixuan Wu835dee22017-09-07 10:47:29 -07009import logging
10
Xinan Lin3ba18a02019-08-13 15:44:55 -070011import buildbucket
12import constants
Xixuan Wu835dee22017-09-07 10:47:29 -070013
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070014import apiclient
Xixuan Wu835dee22017-09-07 10:47:29 -070015from google.appengine.api import taskqueue
16from google.appengine.runtime import apiproxy_errors
17
18
19SUITES_QUEUE = 'suitesQueue'
Xixuan Wu835dee22017-09-07 10:47:29 -070020
Prathmesh Prabhu55c40032020-03-07 00:45:15 -080021Options = collections.namedtuple(
22 'Options',
23 ['batch_size', 'multirequest_size', 'per_suite_multirequest_size'])
Prathmesh Prabhue8182312020-03-06 23:33:07 -080024
25_DEFAULT_OPTIONS = Options(
Prathmesh Prabhuc23748e2020-03-07 00:00:51 -080026 batch_size=100,
Prathmesh Prabhue8182312020-03-06 23:33:07 -080027 multirequest_size=constants.Buildbucket.MULTIREQUEST_SIZE,
Prathmesh Prabhu55c40032020-03-07 00:45:15 -080028 # A limit on the number of tasks included per request for specific suites.
29 per_suite_multirequest_size={
30 # crbug.com/1028732: Generates too many results to store in BuildBucket
31 # output properties.
32 'arc-gts': 10,
33 # crbug.com/1028732: Generates too many results to store in BuildBucket
34 # output properties.
35 'graphics_per-day': 10,
36 })
Prathmesh Prabhue8182312020-03-06 23:33:07 -080037
Xixuan Wu835dee22017-09-07 10:47:29 -070038
Prathmesh Prabhu46331482020-03-07 00:18:10 -080039def new_task_processor():
40 """Factory function to create a task processor appropriate to environment."""
Prathmesh Prabhu8f43d312020-03-07 00:25:06 -080041 # Schedule tests to PROD_BUILDER if in production project, otherwise use
42 # STAGING_BUILDER.
43 builder = constants.Buildbucket.STAGING_BUILDER
44 if (constants.environment() == constants.RunningEnv.ENV_PROD
45 and constants.application_id() == constants.AppID.PROD_APP):
46 builder = constants.Buildbucket.PROD_BUILDER
47 test_platform_client = buildbucket.TestPlatformClient(
48 constants.Buildbucket.HOST, constants.Buildbucket.PROJECT,
49 constants.Buildbucket.BUCKET, builder)
50 return TaskProcessor(
51 queue_name=SUITES_QUEUE,
52 test_platform_client=test_platform_client,
53 options=_DEFAULT_OPTIONS)
Prathmesh Prabhu46331482020-03-07 00:18:10 -080054
55
Xixuan Wu835dee22017-09-07 10:47:29 -070056class TaskProcessor(object):
57 """A class capable of executing tasks by kicking off suites.
58
Prathmesh Prabhu46331482020-03-07 00:18:10 -080059 This class fetches tasks from pullqueue, and kicks off suites
60 represented by tasks' params.
Xixuan Wu835dee22017-09-07 10:47:29 -070061 """
62
Prathmesh Prabhu8f43d312020-03-07 00:25:06 -080063 def __init__(self, queue_name, test_platform_client, options):
Xixuan Wu835dee22017-09-07 10:47:29 -070064 """Initialize a task executor for further pulling & execution.
65
66 Args:
Xinan Lin3ba18a02019-08-13 15:44:55 -070067 queue_name: The name of a pull queue.
Prathmesh Prabhu8f43d312020-03-07 00:25:06 -080068 test_platform_client: A buildbucket.TestPlatformClient object.
Prathmesh Prabhu7b961d52020-03-06 23:57:42 -080069 options: Options to configure the task processor.
Xixuan Wu835dee22017-09-07 10:47:29 -070070 """
Xixuan Wua5a29442017-10-11 11:03:02 -070071 self.queue = taskqueue.Queue(queue_name)
Prathmesh Prabhue8182312020-03-06 23:33:07 -080072 self._options = options
Prathmesh Prabhu8f43d312020-03-07 00:25:06 -080073 self.test_platform_client = test_platform_client
Xixuan Wu835dee22017-09-07 10:47:29 -070074
75 def batch_execute(self):
Craig Bergstrom58263d32018-04-26 14:11:35 -060076 """Execute tasks."""
Xinan Lin9e4917d2019-11-04 10:58:47 -080077 executed_tasks_count = 0
Prathmesh Prabhue8182312020-03-06 23:33:07 -080078 while (executed_tasks_count + self._options.multirequest_size <=
79 self._options.batch_size):
Xixuan Wu835dee22017-09-07 10:47:29 -070080 try:
Prathmesh Prabhue8182312020-03-06 23:33:07 -080081 tasks = self.queue.lease_tasks_by_tag(
82 3600, self._options.multirequest_size, deadline=60)
Xinan Lin9e4917d2019-11-04 10:58:47 -080083 except (taskqueue.UnknownQueueError,
84 taskqueue.TransientError,
85 apiproxy_errors.DeadlineExceededError) as e:
86 logging.exception(e)
87 raise
Xixuan Wua5a29442017-10-11 11:03:02 -070088
Xinan Lin9e4917d2019-11-04 10:58:47 -080089 if not tasks:
90 return
91
Prathmesh Prabhu55c40032020-03-07 00:45:15 -080092 tasks = self._limit_heavy_tasks(tasks)
93 executed_tasks = []
Xinan Lin9e4917d2019-11-04 10:58:47 -080094 try:
95 executed_tasks.extend(
Prathmesh Prabhu55c40032020-03-07 00:45:15 -080096 self.test_platform_client.multirequest_run(tasks, _suite(tasks)))
Xinan Lin9e4917d2019-11-04 10:58:47 -080097 except (ValueError,
98 buildbucket.BuildbucketRunError,
99 apiclient.errors.HttpError) as e:
100 logging.exception('Failed to kick off %d tasks for suite %s',
Prathmesh Prabhu55c40032020-03-07 00:45:15 -0800101 len(tasks), _suite(tasks))
Xixuan Wu835dee22017-09-07 10:47:29 -0700102 finally:
103 if executed_tasks:
Xinan Lin9e4917d2019-11-04 10:58:47 -0800104 executed_tasks_count += len(executed_tasks)
105 logging.info('Successfully kicking %d tasks for suite %s',
Prathmesh Prabhu55c40032020-03-07 00:45:15 -0800106 len(executed_tasks), _suite(tasks))
Xixuan Wua5a29442017-10-11 11:03:02 -0700107 self.queue.delete_tasks(executed_tasks)
108
109 def purge(self):
110 """Purge the entire tasks in the task queue."""
111 self.queue.purge()
Xixuan Wu835dee22017-09-07 10:47:29 -0700112
Prathmesh Prabhu55c40032020-03-07 00:45:15 -0800113 def _limit_heavy_tasks(self, tasks):
114 """Further limits tasks known to cause large load on cros_test_platform."""
115 if not tasks:
116 return tasks
117 limit = self._options.per_suite_multirequest_size.get(
118 _suite(tasks),
119 self._options.multirequest_size,
120 )
121 keep = tasks[:limit]
122 forget = tasks[limit:]
123 for task in forget:
124 self.queue.modify_task_lease(task, 0)
125 return keep
126
Xixuan Wu835dee22017-09-07 10:47:29 -0700127
Xinan Lin9e4917d2019-11-04 10:58:47 -0800128def push(queue_name, tag=None, **suite_kwargs):
Xixuan Wu835dee22017-09-07 10:47:29 -0700129 """Push suites to suite queue for later kickoff.
130
131 Args:
132 queue_name: the name of a pull queue.
Xinan Lin9e4917d2019-11-04 10:58:47 -0800133 tag: tag of a pull queue task.
Xixuan Wu835dee22017-09-07 10:47:29 -0700134 **suite_kwargs: the args for a suite to kick off.
135 """
136 queue = taskqueue.Queue(queue_name)
Xinan Lin9e4917d2019-11-04 10:58:47 -0800137 queue.add(taskqueue.Task(method='PULL', tag=tag, params=suite_kwargs))
Prathmesh Prabhu55c40032020-03-07 00:45:15 -0800138
139
140def _suite(tasks):
141 return tasks[0].tag