blob: 94dc9abdac51c32b9cb341105f71223a6294da3c [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for executing tasks queued by suite scheduler."""
6
7import logging
8
Xixuan Wua5a29442017-10-11 11:03:02 -07009import global_config
Xixuan Wu835dee22017-09-07 10:47:29 -070010import swarming_lib
11
12from google.appengine.api import taskqueue
13from google.appengine.runtime import apiproxy_errors
14
15
16SUITES_QUEUE = 'suitesQueue'
17BATCH_SIZE = 100
18
19
20class TaskProcessor(object):
21 """A class capable of executing tasks by kicking off suites.
22
23 This class fetches tasks from pullqueue, and kicking off suites
24 represented by tasks' params through ChromeOS swarming proxy server.
25 """
26
27 def __init__(self, queue_name):
28 """Initialize a task executor for further pulling & execution.
29
30 Args:
31 queue_name: the name of a pull queue.
32 """
Xixuan Wua5a29442017-10-11 11:03:02 -070033 self.queue = taskqueue.Queue(queue_name)
34 self.swarming = swarming_lib.SwarmingRunner()
Xixuan Wu835dee22017-09-07 10:47:29 -070035
36 def batch_execute(self):
37 try:
Xixuan Wua5a29442017-10-11 11:03:02 -070038 tasks = self.queue.lease_tasks(3600, BATCH_SIZE, deadline=60)
Xixuan Wu835dee22017-09-07 10:47:29 -070039 except (taskqueue.UnknownQueueError,
40 taskqueue.TransientError,
41 apiproxy_errors.DeadlineExceededError) as e:
42 logging.exception(e)
43 raise
44
45 if tasks:
46 executed_tasks = []
47 try:
48 for task in tasks:
Xixuan Wua5a29442017-10-11 11:03:02 -070049 if global_config.GAE_TESTING:
50 self.swarming.dummy_run()
51 else:
52 self.swarming.run(**task.extract_params())
53
Xixuan Wu835dee22017-09-07 10:47:29 -070054 executed_tasks.append(task)
55
Xixuan Wu2ba72652017-09-15 15:49:42 -070056 except (ValueError, swarming_lib.SwarmingRunError) as e:
Xixuan Wu835dee22017-09-07 10:47:29 -070057 logging.exception(e)
58 raise
59 finally:
60 if executed_tasks:
61 logging.info('Successfully kicking %d tasks', len(executed_tasks))
Xixuan Wua5a29442017-10-11 11:03:02 -070062 self.queue.delete_tasks(executed_tasks)
63
64 def purge(self):
65 """Purge the entire tasks in the task queue."""
66 self.queue.purge()
Xixuan Wu835dee22017-09-07 10:47:29 -070067
68
69def push(queue_name, **suite_kwargs):
70 """Push suites to suite queue for later kickoff.
71
72 Args:
73 queue_name: the name of a pull queue.
74 **suite_kwargs: the args for a suite to kick off.
75 """
76 queue = taskqueue.Queue(queue_name)
77 queue.add(taskqueue.Task(method='PULL', params=suite_kwargs))