blob: 370f6b7448906d88fc3773f8793a0421324f6ba3 [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for executing tasks queued by suite scheduler."""
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -07006# pylint: disable=g-bad-import-order
Xixuan Wu835dee22017-09-07 10:47:29 -07007
Prathmesh Prabhue8182312020-03-06 23:33:07 -08008import collections
Xixuan Wu835dee22017-09-07 10:47:29 -07009import logging
10
Xinan Lin3ba18a02019-08-13 15:44:55 -070011import buildbucket
12import constants
Xixuan Wu835dee22017-09-07 10:47:29 -070013
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070014import apiclient
Xixuan Wu835dee22017-09-07 10:47:29 -070015from google.appengine.api import taskqueue
16from google.appengine.runtime import apiproxy_errors
17
18
19SUITES_QUEUE = 'suitesQueue'
Xixuan Wu835dee22017-09-07 10:47:29 -070020
Prathmesh Prabhue8182312020-03-06 23:33:07 -080021Options = collections.namedtuple('Options',
22 ['batch_size', 'multirequest_size'])
23
24_DEFAULT_OPTIONS = Options(
Prathmesh Prabhuc23748e2020-03-07 00:00:51 -080025 batch_size=100,
Prathmesh Prabhue8182312020-03-06 23:33:07 -080026 multirequest_size=constants.Buildbucket.MULTIREQUEST_SIZE,
27)
28
Xixuan Wu835dee22017-09-07 10:47:29 -070029
Prathmesh Prabhu46331482020-03-07 00:18:10 -080030def new_task_processor():
31 """Factory function to create a task processor appropriate to environment."""
Prathmesh Prabhu8f43d312020-03-07 00:25:06 -080032 # Schedule tests to PROD_BUILDER if in production project, otherwise use
33 # STAGING_BUILDER.
34 builder = constants.Buildbucket.STAGING_BUILDER
35 if (constants.environment() == constants.RunningEnv.ENV_PROD
36 and constants.application_id() == constants.AppID.PROD_APP):
37 builder = constants.Buildbucket.PROD_BUILDER
38 test_platform_client = buildbucket.TestPlatformClient(
39 constants.Buildbucket.HOST, constants.Buildbucket.PROJECT,
40 constants.Buildbucket.BUCKET, builder)
41 return TaskProcessor(
42 queue_name=SUITES_QUEUE,
43 test_platform_client=test_platform_client,
44 options=_DEFAULT_OPTIONS)
Prathmesh Prabhu46331482020-03-07 00:18:10 -080045
46
Xixuan Wu835dee22017-09-07 10:47:29 -070047class TaskProcessor(object):
48 """A class capable of executing tasks by kicking off suites.
49
Prathmesh Prabhu46331482020-03-07 00:18:10 -080050 This class fetches tasks from pullqueue, and kicks off suites
51 represented by tasks' params.
Xixuan Wu835dee22017-09-07 10:47:29 -070052 """
53
Prathmesh Prabhu8f43d312020-03-07 00:25:06 -080054 def __init__(self, queue_name, test_platform_client, options):
Xixuan Wu835dee22017-09-07 10:47:29 -070055 """Initialize a task executor for further pulling & execution.
56
57 Args:
Xinan Lin3ba18a02019-08-13 15:44:55 -070058 queue_name: The name of a pull queue.
Prathmesh Prabhu8f43d312020-03-07 00:25:06 -080059 test_platform_client: A buildbucket.TestPlatformClient object.
Prathmesh Prabhu7b961d52020-03-06 23:57:42 -080060 options: Options to configure the task processor.
Xixuan Wu835dee22017-09-07 10:47:29 -070061 """
Xixuan Wua5a29442017-10-11 11:03:02 -070062 self.queue = taskqueue.Queue(queue_name)
Prathmesh Prabhue8182312020-03-06 23:33:07 -080063 self._options = options
Prathmesh Prabhu8f43d312020-03-07 00:25:06 -080064 self.test_platform_client = test_platform_client
Xixuan Wu835dee22017-09-07 10:47:29 -070065
66 def batch_execute(self):
Craig Bergstrom58263d32018-04-26 14:11:35 -060067 """Execute tasks."""
Xinan Lin9e4917d2019-11-04 10:58:47 -080068 executed_tasks_count = 0
Prathmesh Prabhue8182312020-03-06 23:33:07 -080069 while (executed_tasks_count + self._options.multirequest_size <=
70 self._options.batch_size):
Xixuan Wu835dee22017-09-07 10:47:29 -070071 try:
Prathmesh Prabhue8182312020-03-06 23:33:07 -080072 tasks = self.queue.lease_tasks_by_tag(
73 3600, self._options.multirequest_size, deadline=60)
Xinan Lin9e4917d2019-11-04 10:58:47 -080074 except (taskqueue.UnknownQueueError,
75 taskqueue.TransientError,
76 apiproxy_errors.DeadlineExceededError) as e:
77 logging.exception(e)
78 raise
Xixuan Wua5a29442017-10-11 11:03:02 -070079
Xinan Lin9e4917d2019-11-04 10:58:47 -080080 executed_tasks = []
81 if not tasks:
82 return
83
84 try:
85 executed_tasks.extend(
86 self.test_platform_client.multirequest_run(tasks, tasks[0].tag))
87 except (ValueError,
88 buildbucket.BuildbucketRunError,
89 apiclient.errors.HttpError) as e:
90 logging.exception('Failed to kick off %d tasks for suite %s',
91 len(tasks), tasks[0].tag)
Xixuan Wu835dee22017-09-07 10:47:29 -070092 finally:
93 if executed_tasks:
Xinan Lin9e4917d2019-11-04 10:58:47 -080094 executed_tasks_count += len(executed_tasks)
95 logging.info('Successfully kicking %d tasks for suite %s',
96 len(executed_tasks), tasks[0].tag)
Xixuan Wua5a29442017-10-11 11:03:02 -070097 self.queue.delete_tasks(executed_tasks)
98
99 def purge(self):
100 """Purge the entire tasks in the task queue."""
101 self.queue.purge()
Xixuan Wu835dee22017-09-07 10:47:29 -0700102
103
Xinan Lin9e4917d2019-11-04 10:58:47 -0800104def push(queue_name, tag=None, **suite_kwargs):
Xixuan Wu835dee22017-09-07 10:47:29 -0700105 """Push suites to suite queue for later kickoff.
106
107 Args:
108 queue_name: the name of a pull queue.
Xinan Lin9e4917d2019-11-04 10:58:47 -0800109 tag: tag of a pull queue task.
Xixuan Wu835dee22017-09-07 10:47:29 -0700110 **suite_kwargs: the args for a suite to kick off.
111 """
112 queue = taskqueue.Queue(queue_name)
Xinan Lin9e4917d2019-11-04 10:58:47 -0800113 queue.add(taskqueue.Task(method='PULL', tag=tag, params=suite_kwargs))