blob: da37115685f7b41c3e63d92cc3377d02d4490e99 [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for executing tasks queued by suite scheduler."""
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -07006# pylint: disable=g-bad-import-order
Xixuan Wu835dee22017-09-07 10:47:29 -07007
Xinan Lin3ba18a02019-08-13 15:44:55 -07008import ast
Xixuan Wu835dee22017-09-07 10:47:29 -07009import logging
10
Xinan Lin3ba18a02019-08-13 15:44:55 -070011import buildbucket
12import constants
Xixuan Wua5a29442017-10-11 11:03:02 -070013import global_config
Xixuan Wu835dee22017-09-07 10:47:29 -070014import swarming_lib
15
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070016import apiclient
Xixuan Wu835dee22017-09-07 10:47:29 -070017from google.appengine.api import taskqueue
18from google.appengine.runtime import apiproxy_errors
19
20
21SUITES_QUEUE = 'suitesQueue'
22BATCH_SIZE = 100
23
24
25class TaskProcessor(object):
26 """A class capable of executing tasks by kicking off suites.
27
28 This class fetches tasks from pullqueue, and kicking off suites
29 represented by tasks' params through ChromeOS swarming proxy server.
30 """
31
Xixuan Wu5d700dc2018-08-21 15:13:10 -070032 def __init__(self, queue_name, afe_server, skylab_swarming_server):
Xixuan Wu835dee22017-09-07 10:47:29 -070033 """Initialize a task executor for further pulling & execution.
34
35 Args:
Xinan Lin3ba18a02019-08-13 15:44:55 -070036 queue_name: The name of a pull queue.
37 afe_server: A string of the address for the afe_server.
Xixuan Wu5d700dc2018-08-21 15:13:10 -070038 skylab_swarming_server: A string of swarming server url for skylab.
Xixuan Wu835dee22017-09-07 10:47:29 -070039 """
Xixuan Wua5a29442017-10-11 11:03:02 -070040 self.queue = taskqueue.Queue(queue_name)
Xixuan Wu5d700dc2018-08-21 15:13:10 -070041 self.swarming = swarming_lib.SwarmingRunner(
42 afe_server, skylab_swarming_server)
Xinan Lin3ba18a02019-08-13 15:44:55 -070043 self.test_platform_client = buildbucket.TestPlatformClient(
44 constants.Buildbucket.HOST,
45 constants.Buildbucket.PROJECT,
46 constants.Buildbucket.BUCKET,
47 constants.Buildbucket.PROD_BUILDER)
Xixuan Wu835dee22017-09-07 10:47:29 -070048
49 def batch_execute(self):
Craig Bergstrom58263d32018-04-26 14:11:35 -060050 """Execute tasks."""
Xixuan Wu835dee22017-09-07 10:47:29 -070051 try:
Xixuan Wua5a29442017-10-11 11:03:02 -070052 tasks = self.queue.lease_tasks(3600, BATCH_SIZE, deadline=60)
Xixuan Wu835dee22017-09-07 10:47:29 -070053 except (taskqueue.UnknownQueueError,
54 taskqueue.TransientError,
55 apiproxy_errors.DeadlineExceededError) as e:
56 logging.exception(e)
57 raise
58
59 if tasks:
60 executed_tasks = []
61 try:
62 for task in tasks:
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070063 try:
64 params = task.extract_params()
Xinan Lin3ba18a02019-08-13 15:44:55 -070065 self._execute(**params)
Xixuan Wua5a29442017-10-11 11:03:02 -070066
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070067 executed_tasks.append(task)
68 except (ValueError, swarming_lib.SwarmingRunError,
69 apiclient.errors.HttpError) as e:
70 logging.exception('Failed to kick off %r', params)
Xixuan Wu835dee22017-09-07 10:47:29 -070071 finally:
72 if executed_tasks:
73 logging.info('Successfully kicking %d tasks', len(executed_tasks))
Xixuan Wua5a29442017-10-11 11:03:02 -070074 self.queue.delete_tasks(executed_tasks)
75
Xinan Lin3ba18a02019-08-13 15:44:55 -070076 def _execute(self, **params):
77 """A wrapper to execute each single task."""
78 is_frontdoor = ast.literal_eval(params.get('is_frontdoor', 'False'))
79 # Test environment.
80 if global_config.GAE_TESTING:
81 if is_frontdoor:
82 self.test_platform_client.dummy_run()
83 return
84 self.swarming.dummy_run(**params)
85 return
86 # Production environment.
87 if is_frontdoor:
88 return self.test_platform_client.run(**params)
89 return self.swarming.run(**params)
90
Xixuan Wua5a29442017-10-11 11:03:02 -070091 def purge(self):
92 """Purge the entire tasks in the task queue."""
93 self.queue.purge()
Xixuan Wu835dee22017-09-07 10:47:29 -070094
95
96def push(queue_name, **suite_kwargs):
97 """Push suites to suite queue for later kickoff.
98
99 Args:
100 queue_name: the name of a pull queue.
101 **suite_kwargs: the args for a suite to kick off.
102 """
103 queue = taskqueue.Queue(queue_name)
104 queue.add(taskqueue.Task(method='PULL', params=suite_kwargs))