blob: 93584ffecf9285f94652c9b6e61c8c4f5156d830 [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for executing tasks queued by suite scheduler."""
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -07006# pylint: disable=g-bad-import-order
Xixuan Wu835dee22017-09-07 10:47:29 -07007
Prathmesh Prabhue8182312020-03-06 23:33:07 -08008import collections
Xixuan Wu835dee22017-09-07 10:47:29 -07009import logging
10
Xinan Lin3ba18a02019-08-13 15:44:55 -070011import buildbucket
12import constants
Xixuan Wu835dee22017-09-07 10:47:29 -070013
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070014import apiclient
Xixuan Wu835dee22017-09-07 10:47:29 -070015from google.appengine.api import taskqueue
16from google.appengine.runtime import apiproxy_errors
17
18
19SUITES_QUEUE = 'suitesQueue'
Xixuan Wu835dee22017-09-07 10:47:29 -070020
Prathmesh Prabhue8182312020-03-06 23:33:07 -080021Options = collections.namedtuple('Options',
22 ['batch_size', 'multirequest_size'])
23
24_DEFAULT_OPTIONS = Options(
Prathmesh Prabhuc23748e2020-03-07 00:00:51 -080025 batch_size=100,
Prathmesh Prabhue8182312020-03-06 23:33:07 -080026 multirequest_size=constants.Buildbucket.MULTIREQUEST_SIZE,
27)
28
Xixuan Wu835dee22017-09-07 10:47:29 -070029
30class TaskProcessor(object):
31 """A class capable of executing tasks by kicking off suites.
32
33 This class fetches tasks from pullqueue, and kicking off suites
34 represented by tasks' params through ChromeOS swarming proxy server.
35 """
36
Prathmesh Prabhue8182312020-03-06 23:33:07 -080037 def __init__(self, queue_name, options=_DEFAULT_OPTIONS):
Xixuan Wu835dee22017-09-07 10:47:29 -070038 """Initialize a task executor for further pulling & execution.
39
40 Args:
Xinan Lin3ba18a02019-08-13 15:44:55 -070041 queue_name: The name of a pull queue.
Prathmesh Prabhu7b961d52020-03-06 23:57:42 -080042 options: Options to configure the task processor.
Xixuan Wu835dee22017-09-07 10:47:29 -070043 """
Xixuan Wua5a29442017-10-11 11:03:02 -070044 self.queue = taskqueue.Queue(queue_name)
Prathmesh Prabhue8182312020-03-06 23:33:07 -080045 self._options = options
linxinane5eb4552019-08-26 05:44:45 +000046 # Schedule tests to PROD_BUILDER if in production project, otherwise use
47 # STAGING_BUILDER.
48 builder = constants.Buildbucket.STAGING_BUILDER
49 if (constants.environment() == constants.RunningEnv.ENV_PROD and
50 constants.application_id() == constants.AppID.PROD_APP):
51 builder = constants.Buildbucket.PROD_BUILDER
Xinan Lin3ba18a02019-08-13 15:44:55 -070052 self.test_platform_client = buildbucket.TestPlatformClient(
53 constants.Buildbucket.HOST,
54 constants.Buildbucket.PROJECT,
55 constants.Buildbucket.BUCKET,
linxinane5eb4552019-08-26 05:44:45 +000056 builder)
Xixuan Wu835dee22017-09-07 10:47:29 -070057
58 def batch_execute(self):
Craig Bergstrom58263d32018-04-26 14:11:35 -060059 """Execute tasks."""
Xinan Lin9e4917d2019-11-04 10:58:47 -080060 executed_tasks_count = 0
Prathmesh Prabhue8182312020-03-06 23:33:07 -080061 while (executed_tasks_count + self._options.multirequest_size <=
62 self._options.batch_size):
Xixuan Wu835dee22017-09-07 10:47:29 -070063 try:
Prathmesh Prabhue8182312020-03-06 23:33:07 -080064 tasks = self.queue.lease_tasks_by_tag(
65 3600, self._options.multirequest_size, deadline=60)
Xinan Lin9e4917d2019-11-04 10:58:47 -080066 except (taskqueue.UnknownQueueError,
67 taskqueue.TransientError,
68 apiproxy_errors.DeadlineExceededError) as e:
69 logging.exception(e)
70 raise
Xixuan Wua5a29442017-10-11 11:03:02 -070071
Xinan Lin9e4917d2019-11-04 10:58:47 -080072 executed_tasks = []
73 if not tasks:
74 return
75
76 try:
77 executed_tasks.extend(
78 self.test_platform_client.multirequest_run(tasks, tasks[0].tag))
79 except (ValueError,
80 buildbucket.BuildbucketRunError,
81 apiclient.errors.HttpError) as e:
82 logging.exception('Failed to kick off %d tasks for suite %s',
83 len(tasks), tasks[0].tag)
Xixuan Wu835dee22017-09-07 10:47:29 -070084 finally:
85 if executed_tasks:
Xinan Lin9e4917d2019-11-04 10:58:47 -080086 executed_tasks_count += len(executed_tasks)
87 logging.info('Successfully kicking %d tasks for suite %s',
88 len(executed_tasks), tasks[0].tag)
Xixuan Wua5a29442017-10-11 11:03:02 -070089 self.queue.delete_tasks(executed_tasks)
90
91 def purge(self):
92 """Purge the entire tasks in the task queue."""
93 self.queue.purge()
Xixuan Wu835dee22017-09-07 10:47:29 -070094
95
Xinan Lin9e4917d2019-11-04 10:58:47 -080096def push(queue_name, tag=None, **suite_kwargs):
Xixuan Wu835dee22017-09-07 10:47:29 -070097 """Push suites to suite queue for later kickoff.
98
99 Args:
100 queue_name: the name of a pull queue.
Xinan Lin9e4917d2019-11-04 10:58:47 -0800101 tag: tag of a pull queue task.
Xixuan Wu835dee22017-09-07 10:47:29 -0700102 **suite_kwargs: the args for a suite to kick off.
103 """
104 queue = taskqueue.Queue(queue_name)
Xinan Lin9e4917d2019-11-04 10:58:47 -0800105 queue.add(taskqueue.Task(method='PULL', tag=tag, params=suite_kwargs))