sequences: schedule jobs within a sequence job.
Instead of launching tests that last a week or more,
sequence a large number of smaller tests (few hours).
This way, if moblab crash, the current test will be marked as failed,
but the tests in the suite will be scheduled.
Note: retention test is still 7 days long.
BUG=chromium:372952
TEST=Run storage_qual_suspend_quick.
Check the child job control files are correct,
check the AFE is taking the tests into account.
Change-Id: I6c276dba1e537e454d9098bc2e99209a871fec57
Signed-off-by: Gwendal Grignou <gwendal@chromium.org>
Reviewed-on: https://chromium-review.googlesource.com/270789
Reviewed-by: Simran Basi <sbasi@chromium.org>
diff --git a/server/sequence.py b/server/sequence.py
new file mode 100644
index 0000000..9c59a14
--- /dev/null
+++ b/server/sequence.py
@@ -0,0 +1,155 @@
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Sequence extensions to server_job.
+Adds ability to schedule jobs on given machines.
+"""
+
+import common
+from autotest_lib.client.common_lib import control_data
+from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
+from autotest_lib.site_utils import job_directories
+
+MINUTE_IN_SECS = 60
+HOUR_IN_MINUTES = 60
+HOUR_IN_SECS = HOUR_IN_MINUTES * MINUTE_IN_SECS
+DAY_IN_HOURS = 24
+DAY_IN_SECS = DAY_IN_HOURS*HOUR_IN_SECS
+
+DEFAULT_JOB_TIMEOUT_IN_MINS = 4 * HOUR_IN_MINUTES
+
+class SequenceJob(object):
+ """Define part of a sequence that will be scheduled by the sequence test."""
+
+ CONTROL_FILE = """
+def run(machine):
+ job.run_test('%s', client_ip=machine, %s)
+
+parallel_simple(run, machines)
+"""
+
+ def __init__(self, name, args, iteration=1, duration=None):
+ """
+ Constructor
+
+ @param name: name of the sever test to run.
+ @param args: arguments needed by the server test.
+ @param iteration: number of copy of this test to sechudle
+ @param duration: expected duration of the test (in seconds).
+ """
+ self._name = name
+ self._args = args or {}
+ self._iteration = iteration
+ self._duration = duration
+
+
+ def child_job_name(self, machine, iteration_number):
+ """
+ Return a name for a child job.
+
+ @param machine: machine name on which the test will run.
+ @param iteration_number: number with 0 and self._iteration - 1.
+
+ @returns a unique name based on the machine, the name and the iteration.
+ """
+ name_parts = [machine, self._name]
+ tag = self._args.get('tag')
+ if tag:
+ name_parts.append(tag)
+ if self._iteration > 1:
+ name_parts.append(str(iteration_number))
+ return '_'.join(name_parts)
+
+
+ def child_job_timeout(self):
+ """
+ Get the child job timeout in minutes.
+
+ @param args: arguments sent to the test.
+
+ @returns a timeout value for the test, 4h by default.
+ """
+ if self._duration:
+ return 2 * int(self._duration) / MINUTE_IN_SECS
+ # default value:
+ return DEFAULT_JOB_TIMEOUT_IN_MINS
+
+
+ def child_control_file(self):
+ """
+ Populate the template control file.
+
+ Populate it with the test name and expand the arguments
+ list.
+
+ @param test: name of the test to run
+ @param args: dictionary of argument for this test.
+
+ @returns a fully built control file to be use for the child job.
+ """
+ child_args = []
+ for arg, value in self._args.iteritems():
+ child_args.append('%s=%s' % (arg, repr(value)))
+ if self._duration:
+ child_args.append('duration=%d' % self._duration)
+ return self.CONTROL_FILE % (self._name, ', '.join(child_args))
+
+
+ def schedule(self, job, timeout_mins, machine):
+ """
+ Sequence a job on the running AFE.
+
+ Will schedule a given test on the job machine(s).
+ Support a subset of tests:
+ - server job
+ - no hostless.
+ - no cleanup around tests.
+
+ @param job: server_job object that will server as parent.
+ @param timeout_mins: timeout to set up: if the test last more than
+ timeout_mins, the test will fail.
+ @param machine: machine to run the test on.
+
+ @returns a maximal time in minutes that the sequence can take.
+ """
+ afe = frontend_wrappers.RetryingAFE(timeout_min=30, delay_sec=10,
+ user=job.user, debug=False)
+ current_job_id = job_directories.get_job_id_or_task_id(job.resultdir)
+ runtime_mins = self.child_job_timeout()
+
+ for i in xrange(0, self._iteration):
+ afe.create_job(
+ self.child_control_file(),
+ name=self.child_job_name(machine, i),
+ priority='Medium',
+ control_type=control_data.CONTROL_TYPE.SERVER,
+ hosts=[machine], meta_hosts=(), one_time_hosts=(),
+ atomic_group_name=None, synch_count=None, is_template=False,
+ timeout_mins=timeout_mins + (i + 1) * runtime_mins,
+ max_runtime_mins=runtime_mins,
+ run_verify=False, email_list='', dependencies=(),
+ reboot_before=None, reboot_after=None,
+ parse_failed_repair=None,
+ hostless=False, keyvals=None,
+ drone_set=None, image=None,
+ parent_job_id=current_job_id, test_retry=0, run_reset=False,
+ require_ssp=None)
+ return runtime_mins * self._iteration
+
+
+def sequence_schedule(job, machines, server_tests):
+ """
+ Schedule the tests to run
+
+ Launch all the tests in the sequence on all machines.
+ Returns as soon as the jobs are launched.
+
+ @param job: Job running.
+ @param machines: machine to run on.
+ @param server_tests: Array of sequence_test objects.
+ """
+ for machine in machines:
+ timeout_mins = 0
+ for test in server_tests:
+ timeout_mins += test.schedule(job, timeout_mins, machine)