blob: e07bb9c362dfc66a539a08f2eff788494ada2d9c [file] [log] [blame]
Xixuan Wu835dee22017-09-07 10:47:29 -07001# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for executing tasks queued by suite scheduler."""
6
7import logging
8
9import swarming_lib
10
11from google.appengine.api import taskqueue
12from google.appengine.runtime import apiproxy_errors
13
14
15SUITES_QUEUE = 'suitesQueue'
16BATCH_SIZE = 100
17
18
19class TaskProcessor(object):
20 """A class capable of executing tasks by kicking off suites.
21
22 This class fetches tasks from pullqueue, and kicking off suites
23 represented by tasks' params through ChromeOS swarming proxy server.
24 """
25
26 def __init__(self, queue_name):
27 """Initialize a task executor for further pulling & execution.
28
29 Args:
30 queue_name: the name of a pull queue.
31 """
32 self._queue = taskqueue.Queue(queue_name)
33 self._swarming = swarming_lib.SwarmingRunner()
34
35 def batch_execute(self):
36 try:
37 tasks = self._queue.lease_tasks(3600, BATCH_SIZE, deadline=60)
38 except (taskqueue.UnknownQueueError,
39 taskqueue.TransientError,
40 apiproxy_errors.DeadlineExceededError) as e:
41 logging.exception(e)
42 raise
43
44 if tasks:
45 executed_tasks = []
46 try:
47 for task in tasks:
48 self._swarming.run(**task.extract_params())
49 executed_tasks.append(task)
50
51 except ValueError as e:
52 logging.exception(e)
53 raise
54 finally:
55 if executed_tasks:
56 logging.info('Successfully kicking %d tasks', len(executed_tasks))
57 self._queue.delete_tasks(executed_tasks)
58
59
60def push(queue_name, **suite_kwargs):
61 """Push suites to suite queue for later kickoff.
62
63 Args:
64 queue_name: the name of a pull queue.
65 **suite_kwargs: the args for a suite to kick off.
66 """
67 queue = taskqueue.Queue(queue_name)
68 queue.add(taskqueue.Task(method='PULL', params=suite_kwargs))