blob: 0ab8466d10794f57a7652be42bb90b821cd5f618 [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for executing tasks queued by suite scheduler."""
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -07006# pylint: disable=g-bad-import-order
Xixuan Wu835dee22017-09-07 10:47:29 -07007
Prathmesh Prabhue8182312020-03-06 23:33:07 -08008import collections
Xixuan Wu835dee22017-09-07 10:47:29 -07009import logging
10
Xinan Lin3ba18a02019-08-13 15:44:55 -070011import buildbucket
12import constants
Xixuan Wu835dee22017-09-07 10:47:29 -070013
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070014import apiclient
Xixuan Wu835dee22017-09-07 10:47:29 -070015from google.appengine.api import taskqueue
16from google.appengine.runtime import apiproxy_errors
17
18
19SUITES_QUEUE = 'suitesQueue'
Xixuan Wu835dee22017-09-07 10:47:29 -070020
Prathmesh Prabhue8182312020-03-06 23:33:07 -080021Options = collections.namedtuple('Options',
22 ['batch_size', 'multirequest_size'])
23
24_DEFAULT_OPTIONS = Options(
Prathmesh Prabhuc23748e2020-03-07 00:00:51 -080025 batch_size=100,
Prathmesh Prabhue8182312020-03-06 23:33:07 -080026 multirequest_size=constants.Buildbucket.MULTIREQUEST_SIZE,
27)
28
Xixuan Wu835dee22017-09-07 10:47:29 -070029
Prathmesh Prabhu46331482020-03-07 00:18:10 -080030def new_task_processor():
31 """Factory function to create a task processor appropriate to environment."""
32 return TaskProcessor(queue_name=SUITES_QUEUE, options=_DEFAULT_OPTIONS)
33
34
Xixuan Wu835dee22017-09-07 10:47:29 -070035class TaskProcessor(object):
36 """A class capable of executing tasks by kicking off suites.
37
Prathmesh Prabhu46331482020-03-07 00:18:10 -080038 This class fetches tasks from pullqueue, and kicks off suites
39 represented by tasks' params.
Xixuan Wu835dee22017-09-07 10:47:29 -070040 """
41
Prathmesh Prabhu46331482020-03-07 00:18:10 -080042 def __init__(self, queue_name, options):
Xixuan Wu835dee22017-09-07 10:47:29 -070043 """Initialize a task executor for further pulling & execution.
44
45 Args:
Xinan Lin3ba18a02019-08-13 15:44:55 -070046 queue_name: The name of a pull queue.
Prathmesh Prabhu7b961d52020-03-06 23:57:42 -080047 options: Options to configure the task processor.
Xixuan Wu835dee22017-09-07 10:47:29 -070048 """
Xixuan Wua5a29442017-10-11 11:03:02 -070049 self.queue = taskqueue.Queue(queue_name)
Prathmesh Prabhue8182312020-03-06 23:33:07 -080050 self._options = options
linxinane5eb4552019-08-26 05:44:45 +000051 # Schedule tests to PROD_BUILDER if in production project, otherwise use
52 # STAGING_BUILDER.
53 builder = constants.Buildbucket.STAGING_BUILDER
54 if (constants.environment() == constants.RunningEnv.ENV_PROD and
55 constants.application_id() == constants.AppID.PROD_APP):
56 builder = constants.Buildbucket.PROD_BUILDER
Xinan Lin3ba18a02019-08-13 15:44:55 -070057 self.test_platform_client = buildbucket.TestPlatformClient(
58 constants.Buildbucket.HOST,
59 constants.Buildbucket.PROJECT,
60 constants.Buildbucket.BUCKET,
linxinane5eb4552019-08-26 05:44:45 +000061 builder)
Xixuan Wu835dee22017-09-07 10:47:29 -070062
63 def batch_execute(self):
Craig Bergstrom58263d32018-04-26 14:11:35 -060064 """Execute tasks."""
Xinan Lin9e4917d2019-11-04 10:58:47 -080065 executed_tasks_count = 0
Prathmesh Prabhue8182312020-03-06 23:33:07 -080066 while (executed_tasks_count + self._options.multirequest_size <=
67 self._options.batch_size):
Xixuan Wu835dee22017-09-07 10:47:29 -070068 try:
Prathmesh Prabhue8182312020-03-06 23:33:07 -080069 tasks = self.queue.lease_tasks_by_tag(
70 3600, self._options.multirequest_size, deadline=60)
Xinan Lin9e4917d2019-11-04 10:58:47 -080071 except (taskqueue.UnknownQueueError,
72 taskqueue.TransientError,
73 apiproxy_errors.DeadlineExceededError) as e:
74 logging.exception(e)
75 raise
Xixuan Wua5a29442017-10-11 11:03:02 -070076
Xinan Lin9e4917d2019-11-04 10:58:47 -080077 executed_tasks = []
78 if not tasks:
79 return
80
81 try:
82 executed_tasks.extend(
83 self.test_platform_client.multirequest_run(tasks, tasks[0].tag))
84 except (ValueError,
85 buildbucket.BuildbucketRunError,
86 apiclient.errors.HttpError) as e:
87 logging.exception('Failed to kick off %d tasks for suite %s',
88 len(tasks), tasks[0].tag)
Xixuan Wu835dee22017-09-07 10:47:29 -070089 finally:
90 if executed_tasks:
Xinan Lin9e4917d2019-11-04 10:58:47 -080091 executed_tasks_count += len(executed_tasks)
92 logging.info('Successfully kicking %d tasks for suite %s',
93 len(executed_tasks), tasks[0].tag)
Xixuan Wua5a29442017-10-11 11:03:02 -070094 self.queue.delete_tasks(executed_tasks)
95
96 def purge(self):
97 """Purge the entire tasks in the task queue."""
98 self.queue.purge()
Xixuan Wu835dee22017-09-07 10:47:29 -070099
100
Xinan Lin9e4917d2019-11-04 10:58:47 -0800101def push(queue_name, tag=None, **suite_kwargs):
Xixuan Wu835dee22017-09-07 10:47:29 -0700102 """Push suites to suite queue for later kickoff.
103
104 Args:
105 queue_name: the name of a pull queue.
Xinan Lin9e4917d2019-11-04 10:58:47 -0800106 tag: tag of a pull queue task.
Xixuan Wu835dee22017-09-07 10:47:29 -0700107 **suite_kwargs: the args for a suite to kick off.
108 """
109 queue = taskqueue.Queue(queue_name)
Xinan Lin9e4917d2019-11-04 10:58:47 -0800110 queue.add(taskqueue.Task(method='PULL', tag=tag, params=suite_kwargs))