blob: 86c41ef80ed36853d3be89be4ad9abf3a3445376 [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for executing tasks queued by suite scheduler."""
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -07006# pylint: disable=g-bad-import-order
Xixuan Wu835dee22017-09-07 10:47:29 -07007
Xinan Lin3ba18a02019-08-13 15:44:55 -07008import ast
Xixuan Wu835dee22017-09-07 10:47:29 -07009import logging
10
Xinan Lin3ba18a02019-08-13 15:44:55 -070011import buildbucket
12import constants
Xixuan Wua5a29442017-10-11 11:03:02 -070013import global_config
Xixuan Wu835dee22017-09-07 10:47:29 -070014import swarming_lib
15
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070016import apiclient
Xixuan Wu835dee22017-09-07 10:47:29 -070017from google.appengine.api import taskqueue
18from google.appengine.runtime import apiproxy_errors
19
20
21SUITES_QUEUE = 'suitesQueue'
22BATCH_SIZE = 100
23
24
25class TaskProcessor(object):
26 """A class capable of executing tasks by kicking off suites.
27
28 This class fetches tasks from pullqueue, and kicking off suites
29 represented by tasks' params through ChromeOS swarming proxy server.
30 """
31
Xixuan Wu5d700dc2018-08-21 15:13:10 -070032 def __init__(self, queue_name, afe_server, skylab_swarming_server):
Xixuan Wu835dee22017-09-07 10:47:29 -070033 """Initialize a task executor for further pulling & execution.
34
35 Args:
Xinan Lin3ba18a02019-08-13 15:44:55 -070036 queue_name: The name of a pull queue.
37 afe_server: A string of the address for the afe_server.
Xixuan Wu5d700dc2018-08-21 15:13:10 -070038 skylab_swarming_server: A string of swarming server url for skylab.
Xixuan Wu835dee22017-09-07 10:47:29 -070039 """
Xixuan Wua5a29442017-10-11 11:03:02 -070040 self.queue = taskqueue.Queue(queue_name)
Xixuan Wu5d700dc2018-08-21 15:13:10 -070041 self.swarming = swarming_lib.SwarmingRunner(
42 afe_server, skylab_swarming_server)
linxinane5eb4552019-08-26 05:44:45 +000043 # Schedule tests to PROD_BUILDER if in production project, otherwise use
44 # STAGING_BUILDER.
45 builder = constants.Buildbucket.STAGING_BUILDER
46 if (constants.environment() == constants.RunningEnv.ENV_PROD and
47 constants.application_id() == constants.AppID.PROD_APP):
48 builder = constants.Buildbucket.PROD_BUILDER
Xinan Lin3ba18a02019-08-13 15:44:55 -070049 self.test_platform_client = buildbucket.TestPlatformClient(
50 constants.Buildbucket.HOST,
51 constants.Buildbucket.PROJECT,
52 constants.Buildbucket.BUCKET,
linxinane5eb4552019-08-26 05:44:45 +000053 builder)
Xixuan Wu835dee22017-09-07 10:47:29 -070054
55 def batch_execute(self):
Craig Bergstrom58263d32018-04-26 14:11:35 -060056 """Execute tasks."""
Xixuan Wu835dee22017-09-07 10:47:29 -070057 try:
Xixuan Wua5a29442017-10-11 11:03:02 -070058 tasks = self.queue.lease_tasks(3600, BATCH_SIZE, deadline=60)
Xixuan Wu835dee22017-09-07 10:47:29 -070059 except (taskqueue.UnknownQueueError,
60 taskqueue.TransientError,
61 apiproxy_errors.DeadlineExceededError) as e:
62 logging.exception(e)
63 raise
64
65 if tasks:
66 executed_tasks = []
67 try:
68 for task in tasks:
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070069 try:
70 params = task.extract_params()
Xinan Lin3ba18a02019-08-13 15:44:55 -070071 self._execute(**params)
Xixuan Wua5a29442017-10-11 11:03:02 -070072
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070073 executed_tasks.append(task)
74 except (ValueError, swarming_lib.SwarmingRunError,
75 apiclient.errors.HttpError) as e:
76 logging.exception('Failed to kick off %r', params)
Xixuan Wu835dee22017-09-07 10:47:29 -070077 finally:
78 if executed_tasks:
79 logging.info('Successfully kicking %d tasks', len(executed_tasks))
Xixuan Wua5a29442017-10-11 11:03:02 -070080 self.queue.delete_tasks(executed_tasks)
81
Xinan Lin3ba18a02019-08-13 15:44:55 -070082 def _execute(self, **params):
83 """A wrapper to execute each single task."""
linxinane5eb4552019-08-26 05:44:45 +000084 # No dummy run for frontdoor enabled test.
Xinan Lin3ba18a02019-08-13 15:44:55 -070085 is_frontdoor = ast.literal_eval(params.get('is_frontdoor', 'False'))
linxinane5eb4552019-08-26 05:44:45 +000086 if is_frontdoor:
87 return self.test_platform_client.run(**params)
88
Xinan Lin3ba18a02019-08-13 15:44:55 -070089 # Test environment.
90 if global_config.GAE_TESTING:
Xinan Lin3ba18a02019-08-13 15:44:55 -070091 self.swarming.dummy_run(**params)
92 return
93 # Production environment.
Xinan Lin3ba18a02019-08-13 15:44:55 -070094 return self.swarming.run(**params)
95
Xixuan Wua5a29442017-10-11 11:03:02 -070096 def purge(self):
97 """Purge the entire tasks in the task queue."""
98 self.queue.purge()
Xixuan Wu835dee22017-09-07 10:47:29 -070099
100
101def push(queue_name, **suite_kwargs):
102 """Push suites to suite queue for later kickoff.
103
104 Args:
105 queue_name: the name of a pull queue.
106 **suite_kwargs: the args for a suite to kick off.
107 """
108 queue = taskqueue.Queue(queue_name)
109 queue.add(taskqueue.Task(method='PULL', params=suite_kwargs))