blob: 7cf4344dcd945af7fdff70d5c78619f7d23f9578 [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for executing tasks queued by suite scheduler."""
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -07006# pylint: disable=g-bad-import-order
Xixuan Wu835dee22017-09-07 10:47:29 -07007
8import logging
9
Xixuan Wua5a29442017-10-11 11:03:02 -070010import global_config
Xixuan Wu835dee22017-09-07 10:47:29 -070011import swarming_lib
12
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070013import apiclient
Xixuan Wu835dee22017-09-07 10:47:29 -070014from google.appengine.api import taskqueue
15from google.appengine.runtime import apiproxy_errors
16
17
18SUITES_QUEUE = 'suitesQueue'
19BATCH_SIZE = 100
20
21
22class TaskProcessor(object):
23 """A class capable of executing tasks by kicking off suites.
24
25 This class fetches tasks from pullqueue, and kicking off suites
26 represented by tasks' params through ChromeOS swarming proxy server.
27 """
28
Xixuan Wu5d700dc2018-08-21 15:13:10 -070029 def __init__(self, queue_name, afe_server, skylab_swarming_server):
Xixuan Wu835dee22017-09-07 10:47:29 -070030 """Initialize a task executor for further pulling & execution.
31
32 Args:
33 queue_name: the name of a pull queue.
Craig Bergstrom3212fb42018-04-17 19:24:15 -060034 afe_server: A str- the address of the afe_server.
Xixuan Wu5d700dc2018-08-21 15:13:10 -070035 skylab_swarming_server: A string of swarming server url for skylab.
Xixuan Wu835dee22017-09-07 10:47:29 -070036 """
Xixuan Wua5a29442017-10-11 11:03:02 -070037 self.queue = taskqueue.Queue(queue_name)
Xixuan Wu5d700dc2018-08-21 15:13:10 -070038 self.swarming = swarming_lib.SwarmingRunner(
39 afe_server, skylab_swarming_server)
Xixuan Wu835dee22017-09-07 10:47:29 -070040
41 def batch_execute(self):
Craig Bergstrom58263d32018-04-26 14:11:35 -060042 """Execute tasks."""
Xixuan Wu835dee22017-09-07 10:47:29 -070043 try:
Xixuan Wua5a29442017-10-11 11:03:02 -070044 tasks = self.queue.lease_tasks(3600, BATCH_SIZE, deadline=60)
Xixuan Wu835dee22017-09-07 10:47:29 -070045 except (taskqueue.UnknownQueueError,
46 taskqueue.TransientError,
47 apiproxy_errors.DeadlineExceededError) as e:
48 logging.exception(e)
49 raise
50
51 if tasks:
52 executed_tasks = []
53 try:
54 for task in tasks:
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070055 try:
56 params = task.extract_params()
57 if global_config.GAE_TESTING:
58 self.swarming.dummy_run()
Xixuan Wua17e3282018-08-21 16:11:11 -070059 self.swarming.dummy_run(is_skylab=True)
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070060 else:
61 self.swarming.run(**params)
Xixuan Wua5a29442017-10-11 11:03:02 -070062
Xixuan Wu0a8d3ee2017-10-19 11:33:26 -070063 executed_tasks.append(task)
64 except (ValueError, swarming_lib.SwarmingRunError,
65 apiclient.errors.HttpError) as e:
66 logging.exception('Failed to kick off %r', params)
Xixuan Wu835dee22017-09-07 10:47:29 -070067 finally:
68 if executed_tasks:
69 logging.info('Successfully kicking %d tasks', len(executed_tasks))
Xixuan Wua5a29442017-10-11 11:03:02 -070070 self.queue.delete_tasks(executed_tasks)
71
72 def purge(self):
73 """Purge the entire tasks in the task queue."""
74 self.queue.purge()
Xixuan Wu835dee22017-09-07 10:47:29 -070075
76
77def push(queue_name, **suite_kwargs):
78 """Push suites to suite queue for later kickoff.
79
80 Args:
81 queue_name: the name of a pull queue.
82 **suite_kwargs: the args for a suite to kick off.
83 """
84 queue = taskqueue.Queue(queue_name)
85 queue.add(taskqueue.Task(method='PULL', params=suite_kwargs))