blob: 77e12e1f5d1a706319db9317e27ea167dac25c48 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
Eric Li861b2d52011-02-04 14:50:35 -08009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform
Paul Pendleburyff1076d2011-03-31 14:45:32 -070010import Queue, threading
mblighfc3da5b2010-01-06 18:37:22 +000011import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
showard75cdfee2009-06-10 17:40:41 +000012from autotest_lib.client.bin import sysinfo
mbligh0d0f67d2009-11-06 03:15:03 +000013from autotest_lib.client.common_lib import base_job
mbligh09108442008-10-15 16:27:38 +000014from autotest_lib.client.common_lib import error, log, utils, packages
showard75cdfee2009-06-10 17:40:41 +000015from autotest_lib.client.common_lib import logging_manager
Paul Pendleburyff1076d2011-03-31 14:45:32 -070016from autotest_lib.server import autotest, hosts, site_host_attributes
jadmanski043e1132008-11-19 17:10:32 +000017from autotest_lib.server import test, subcommand, profilers
mbligh0a883702010-04-21 01:58:34 +000018from autotest_lib.server.hosts import abstract_ssh
jadmanski10646442008-08-13 14:05:21 +000019from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000020
21
mbligh084bc172008-10-18 14:02:45 +000022def _control_segment_path(name):
23 """Get the pathname of the named control segment file."""
jadmanski10646442008-08-13 14:05:21 +000024 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh084bc172008-10-18 14:02:45 +000025 return os.path.join(server_dir, "control_segments", name)
jadmanski10646442008-08-13 14:05:21 +000026
27
mbligh084bc172008-10-18 14:02:45 +000028CLIENT_CONTROL_FILENAME = 'control'
29SERVER_CONTROL_FILENAME = 'control.srv'
30MACHINES_FILENAME = '.machines'
jadmanski10646442008-08-13 14:05:21 +000031
mbligh084bc172008-10-18 14:02:45 +000032CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
33CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
34CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
mbligh084bc172008-10-18 14:02:45 +000035INSTALL_CONTROL_FILE = _control_segment_path('install')
showard45ae8192008-11-05 19:32:53 +000036CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
jadmanski10646442008-08-13 14:05:21 +000037
mbligh084bc172008-10-18 14:02:45 +000038VERIFY_CONTROL_FILE = _control_segment_path('verify')
mbligh084bc172008-10-18 14:02:45 +000039REPAIR_CONTROL_FILE = _control_segment_path('repair')
jadmanski10646442008-08-13 14:05:21 +000040
41
mbligh062ed152009-01-13 00:57:14 +000042# by default provide a stub that generates no site data
43def _get_site_job_data_dummy(job):
44 return {}
45
46
jadmanski10646442008-08-13 14:05:21 +000047# load up site-specific code for generating site-specific job data
mbligh062ed152009-01-13 00:57:14 +000048get_site_job_data = utils.import_site_function(__file__,
jadmanskic0a623d2009-03-03 21:11:48 +000049 "autotest_lib.server.site_server_job", "get_site_job_data",
mbligh062ed152009-01-13 00:57:14 +000050 _get_site_job_data_dummy)
jadmanski10646442008-08-13 14:05:21 +000051
52
jadmanski2a89dac2010-06-11 14:32:58 +000053class status_indenter(base_job.status_indenter):
54 """Provide a simple integer-backed status indenter."""
55 def __init__(self):
56 self._indent = 0
57
58
59 @property
60 def indent(self):
61 return self._indent
62
63
64 def increment(self):
65 self._indent += 1
66
67
68 def decrement(self):
69 self._indent -= 1
70
71
jadmanski52053632010-06-11 21:08:10 +000072 def get_context(self):
73 """Returns a context object for use by job.get_record_context."""
74 class context(object):
75 def __init__(self, indenter, indent):
76 self._indenter = indenter
77 self._indent = indent
78 def restore(self):
79 self._indenter._indent = self._indent
80 return context(self, self._indent)
81
82
jadmanski2a89dac2010-06-11 14:32:58 +000083class server_job_record_hook(object):
84 """The job.record hook for server job. Used to inject WARN messages from
85 the console or vlm whenever new logs are written, and to echo any logs
86 to INFO level logging. Implemented as a class so that it can use state to
87 block recursive calls, so that the hook can call job.record itself to
88 log WARN messages.
89
90 Depends on job._read_warnings and job._logger.
91 """
92 def __init__(self, job):
93 self._job = job
94 self._being_called = False
95
96
97 def __call__(self, entry):
98 """A wrapper around the 'real' record hook, the _hook method, which
99 prevents recursion. This isn't making any effort to be threadsafe,
100 the intent is to outright block infinite recursion via a
101 job.record->_hook->job.record->_hook->job.record... chain."""
102 if self._being_called:
103 return
104 self._being_called = True
105 try:
106 self._hook(self._job, entry)
107 finally:
108 self._being_called = False
109
110
111 @staticmethod
112 def _hook(job, entry):
113 """The core hook, which can safely call job.record."""
114 entries = []
115 # poll all our warning loggers for new warnings
116 for timestamp, msg in job._read_warnings():
117 warning_entry = base_job.status_log_entry(
118 'WARN', None, None, msg, {}, timestamp=timestamp)
119 entries.append(warning_entry)
120 job.record_entry(warning_entry)
121 # echo rendered versions of all the status logs to info
122 entries.append(entry)
123 for entry in entries:
124 rendered_entry = job._logger.render_entry(entry)
125 logging.info(rendered_entry)
jadmanskie29d0e42010-06-17 16:06:52 +0000126 job._parse_status(rendered_entry)
jadmanski2a89dac2010-06-11 14:32:58 +0000127
128
mbligh0d0f67d2009-11-06 03:15:03 +0000129class base_server_job(base_job.base_job):
130 """The server-side concrete implementation of base_job.
jadmanski10646442008-08-13 14:05:21 +0000131
mbligh0d0f67d2009-11-06 03:15:03 +0000132 Optional properties provided by this implementation:
133 serverdir
134 conmuxdir
135
136 num_tests_run
137 num_tests_failed
138
139 warning_manager
140 warning_loggers
jadmanski10646442008-08-13 14:05:21 +0000141 """
142
mbligh0d0f67d2009-11-06 03:15:03 +0000143 _STATUS_VERSION = 1
jadmanski10646442008-08-13 14:05:21 +0000144
145 def __init__(self, control, args, resultdir, label, user, machines,
146 client=False, parse_job='',
mbligh374f3412009-05-13 21:29:45 +0000147 ssh_user='root', ssh_port=22, ssh_pass='',
mblighe0cbc912010-03-11 18:03:07 +0000148 group_name='', tag='',
149 control_filename=SERVER_CONTROL_FILENAME):
jadmanski10646442008-08-13 14:05:21 +0000150 """
mbligh374f3412009-05-13 21:29:45 +0000151 Create a server side job object.
mblighb5dac432008-11-27 00:38:44 +0000152
mblighe7d9c602009-07-02 19:02:33 +0000153 @param control: The pathname of the control file.
154 @param args: Passed to the control file.
155 @param resultdir: Where to throw the results.
156 @param label: Description of the job.
157 @param user: Username for the job (email address).
158 @param client: True if this is a client-side control file.
159 @param parse_job: string, if supplied it is the job execution tag that
160 the results will be passed through to the TKO parser with.
161 @param ssh_user: The SSH username. [root]
162 @param ssh_port: The SSH port number. [22]
163 @param ssh_pass: The SSH passphrase, if needed.
164 @param group_name: If supplied, this will be written out as
mbligh374f3412009-05-13 21:29:45 +0000165 host_group_name in the keyvals file for the parser.
mblighe7d9c602009-07-02 19:02:33 +0000166 @param tag: The job execution tag from the scheduler. [optional]
mblighe0cbc912010-03-11 18:03:07 +0000167 @param control_filename: The filename where the server control file
168 should be written in the results directory.
jadmanski10646442008-08-13 14:05:21 +0000169 """
mbligh0d0f67d2009-11-06 03:15:03 +0000170 super(base_server_job, self).__init__(resultdir=resultdir)
mbligha788dc42009-03-26 21:10:16 +0000171
mbligh0d0f67d2009-11-06 03:15:03 +0000172 path = os.path.dirname(__file__)
173 self.control = control
174 self._uncollected_log_file = os.path.join(self.resultdir,
175 'uncollected_logs')
176 debugdir = os.path.join(self.resultdir, 'debug')
177 if not os.path.exists(debugdir):
178 os.mkdir(debugdir)
179
180 if user:
181 self.user = user
182 else:
183 self.user = getpass.getuser()
184
jadmanski808f4b12010-04-09 22:30:31 +0000185 self.args = args
jadmanski10646442008-08-13 14:05:21 +0000186 self.machines = machines
mbligh0d0f67d2009-11-06 03:15:03 +0000187 self._client = client
jadmanski10646442008-08-13 14:05:21 +0000188 self.warning_loggers = set()
jadmanskif37df842009-02-11 00:03:26 +0000189 self.warning_manager = warning_manager()
mbligh0d0f67d2009-11-06 03:15:03 +0000190 self._ssh_user = ssh_user
191 self._ssh_port = ssh_port
192 self._ssh_pass = ssh_pass
mblighe7d9c602009-07-02 19:02:33 +0000193 self.tag = tag
mbligh09108442008-10-15 16:27:38 +0000194 self.last_boot_tag = None
jadmanski53aaf382008-11-17 16:22:31 +0000195 self.hosts = set()
mbligh0d0f67d2009-11-06 03:15:03 +0000196 self.drop_caches = False
mblighb5dac432008-11-27 00:38:44 +0000197 self.drop_caches_between_iterations = False
mblighe0cbc912010-03-11 18:03:07 +0000198 self._control_filename = control_filename
jadmanski10646442008-08-13 14:05:21 +0000199
showard75cdfee2009-06-10 17:40:41 +0000200 self.logging = logging_manager.get_logging_manager(
201 manage_stdout_and_stderr=True, redirect_fds=True)
202 subcommand.logging_manager_object = self.logging
jadmanski10646442008-08-13 14:05:21 +0000203
mbligh0d0f67d2009-11-06 03:15:03 +0000204 self.sysinfo = sysinfo.sysinfo(self.resultdir)
jadmanski043e1132008-11-19 17:10:32 +0000205 self.profilers = profilers.profilers(self)
jadmanskic09fc152008-10-15 17:56:59 +0000206
jadmanski10646442008-08-13 14:05:21 +0000207 job_data = {'label' : label, 'user' : user,
208 'hostname' : ','.join(machines),
Eric Li861b2d52011-02-04 14:50:35 -0800209 'drone' : platform.node(),
mbligh0d0f67d2009-11-06 03:15:03 +0000210 'status_version' : str(self._STATUS_VERSION),
showard170873e2009-01-07 00:22:26 +0000211 'job_started' : str(int(time.time()))}
mbligh374f3412009-05-13 21:29:45 +0000212 if group_name:
213 job_data['host_group_name'] = group_name
jadmanski10646442008-08-13 14:05:21 +0000214
mbligh0d0f67d2009-11-06 03:15:03 +0000215 # only write these keyvals out on the first job in a resultdir
216 if 'job_started' not in utils.read_keyval(self.resultdir):
217 job_data.update(get_site_job_data(self))
218 utils.write_keyval(self.resultdir, job_data)
219
220 self._parse_job = parse_job
showardcc929362010-01-25 21:20:41 +0000221 self._using_parser = (self._parse_job and len(machines) <= 1)
mbligh0d0f67d2009-11-06 03:15:03 +0000222 self.pkgmgr = packages.PackageManager(
223 self.autodir, run_function_dargs={'timeout':600})
showard21baa452008-10-21 00:08:39 +0000224 self.num_tests_run = 0
225 self.num_tests_failed = 0
226
jadmanski550fdc22008-11-20 16:32:08 +0000227 self._register_subcommand_hooks()
228
mbligh0d0f67d2009-11-06 03:15:03 +0000229 # these components aren't usable on the server
230 self.bootloader = None
231 self.harness = None
232
jadmanski2a89dac2010-06-11 14:32:58 +0000233 # set up the status logger
jadmanski52053632010-06-11 21:08:10 +0000234 self._indenter = status_indenter()
jadmanski2a89dac2010-06-11 14:32:58 +0000235 self._logger = base_job.status_logger(
jadmanski52053632010-06-11 21:08:10 +0000236 self, self._indenter, 'status.log', 'status.log',
jadmanski2a89dac2010-06-11 14:32:58 +0000237 record_hook=server_job_record_hook(self))
238
mbligh0d0f67d2009-11-06 03:15:03 +0000239
240 @classmethod
241 def _find_base_directories(cls):
242 """
243 Determine locations of autodir, clientdir and serverdir. Assumes
244 that this file is located within serverdir and uses __file__ along
245 with relative paths to resolve the location.
246 """
247 serverdir = os.path.abspath(os.path.dirname(__file__))
248 autodir = os.path.normpath(os.path.join(serverdir, '..'))
249 clientdir = os.path.join(autodir, 'client')
250 return autodir, clientdir, serverdir
251
252
253 def _find_resultdir(self, resultdir):
254 """
255 Determine the location of resultdir. For server jobs we expect one to
256 always be explicitly passed in to __init__, so just return that.
257 """
258 if resultdir:
259 return os.path.normpath(resultdir)
260 else:
261 return None
262
jadmanski550fdc22008-11-20 16:32:08 +0000263
jadmanski2a89dac2010-06-11 14:32:58 +0000264 def _get_status_logger(self):
265 """Return a reference to the status logger."""
266 return self._logger
267
268
jadmanskie432dd22009-01-30 15:04:51 +0000269 @staticmethod
270 def _load_control_file(path):
271 f = open(path)
272 try:
273 control_file = f.read()
274 finally:
275 f.close()
276 return re.sub('\r', '', control_file)
277
278
jadmanski550fdc22008-11-20 16:32:08 +0000279 def _register_subcommand_hooks(self):
mbligh2b92b862008-11-22 13:25:32 +0000280 """
281 Register some hooks into the subcommand modules that allow us
282 to properly clean up self.hosts created in forked subprocesses.
283 """
jadmanski550fdc22008-11-20 16:32:08 +0000284 def on_fork(cmd):
285 self._existing_hosts_on_fork = set(self.hosts)
286 def on_join(cmd):
287 new_hosts = self.hosts - self._existing_hosts_on_fork
288 for host in new_hosts:
289 host.close()
290 subcommand.subcommand.register_fork_hook(on_fork)
291 subcommand.subcommand.register_join_hook(on_join)
292
jadmanski10646442008-08-13 14:05:21 +0000293
mbligh4608b002010-01-05 18:22:35 +0000294 def init_parser(self):
mbligh2b92b862008-11-22 13:25:32 +0000295 """
mbligh4608b002010-01-05 18:22:35 +0000296 Start the continuous parsing of self.resultdir. This sets up
jadmanski10646442008-08-13 14:05:21 +0000297 the database connection and inserts the basic job object into
mbligh2b92b862008-11-22 13:25:32 +0000298 the database if necessary.
299 """
mbligh4608b002010-01-05 18:22:35 +0000300 if not self._using_parser:
301 return
jadmanski10646442008-08-13 14:05:21 +0000302 # redirect parser debugging to .parse.log
mbligh4608b002010-01-05 18:22:35 +0000303 parse_log = os.path.join(self.resultdir, '.parse.log')
jadmanski10646442008-08-13 14:05:21 +0000304 parse_log = open(parse_log, 'w', 0)
305 tko_utils.redirect_parser_debugging(parse_log)
306 # create a job model object and set up the db
307 self.results_db = tko_db.db(autocommit=True)
mbligh0d0f67d2009-11-06 03:15:03 +0000308 self.parser = status_lib.parser(self._STATUS_VERSION)
mbligh4608b002010-01-05 18:22:35 +0000309 self.job_model = self.parser.make_job(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000310 self.parser.start(self.job_model)
311 # check if a job already exists in the db and insert it if
312 # it does not
mbligh0d0f67d2009-11-06 03:15:03 +0000313 job_idx = self.results_db.find_job(self._parse_job)
jadmanski10646442008-08-13 14:05:21 +0000314 if job_idx is None:
mbligh0d0f67d2009-11-06 03:15:03 +0000315 self.results_db.insert_job(self._parse_job, self.job_model)
jadmanski10646442008-08-13 14:05:21 +0000316 else:
mbligh2b92b862008-11-22 13:25:32 +0000317 machine_idx = self.results_db.lookup_machine(self.job_model.machine)
jadmanski10646442008-08-13 14:05:21 +0000318 self.job_model.index = job_idx
319 self.job_model.machine_idx = machine_idx
320
321
322 def cleanup_parser(self):
mbligh2b92b862008-11-22 13:25:32 +0000323 """
324 This should be called after the server job is finished
jadmanski10646442008-08-13 14:05:21 +0000325 to carry out any remaining cleanup (e.g. flushing any
mbligh2b92b862008-11-22 13:25:32 +0000326 remaining test results to the results db)
327 """
mbligh0d0f67d2009-11-06 03:15:03 +0000328 if not self._using_parser:
jadmanski10646442008-08-13 14:05:21 +0000329 return
330 final_tests = self.parser.end()
331 for test in final_tests:
332 self.__insert_test(test)
mbligh0d0f67d2009-11-06 03:15:03 +0000333 self._using_parser = False
jadmanski10646442008-08-13 14:05:21 +0000334
335
336 def verify(self):
337 if not self.machines:
mbligh084bc172008-10-18 14:02:45 +0000338 raise error.AutoservError('No machines specified to verify')
mbligh0fce4112008-11-27 00:37:17 +0000339 if self.resultdir:
340 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000341 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000342 namespace = {'machines' : self.machines, 'job' : self,
mbligh0d0f67d2009-11-06 03:15:03 +0000343 'ssh_user' : self._ssh_user,
344 'ssh_port' : self._ssh_port,
345 'ssh_pass' : self._ssh_pass}
mbligh084bc172008-10-18 14:02:45 +0000346 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000347 except Exception, e:
mbligh2b92b862008-11-22 13:25:32 +0000348 msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc())
jadmanski10646442008-08-13 14:05:21 +0000349 self.record('ABORT', None, None, msg)
350 raise
351
352
353 def repair(self, host_protection):
354 if not self.machines:
355 raise error.AutoservError('No machines specified to repair')
mbligh0fce4112008-11-27 00:37:17 +0000356 if self.resultdir:
357 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000358 namespace = {'machines': self.machines, 'job': self,
mbligh0d0f67d2009-11-06 03:15:03 +0000359 'ssh_user': self._ssh_user, 'ssh_port': self._ssh_port,
360 'ssh_pass': self._ssh_pass,
jadmanski10646442008-08-13 14:05:21 +0000361 'protection_level': host_protection}
mbligh25c0b8c2009-01-24 01:44:17 +0000362
mbligh0931b0a2009-04-08 17:44:48 +0000363 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000364
365
366 def precheck(self):
367 """
368 perform any additional checks in derived classes.
369 """
370 pass
371
372
373 def enable_external_logging(self):
mbligh2b92b862008-11-22 13:25:32 +0000374 """
375 Start or restart external logging mechanism.
jadmanski10646442008-08-13 14:05:21 +0000376 """
377 pass
378
379
380 def disable_external_logging(self):
mbligh2b92b862008-11-22 13:25:32 +0000381 """
382 Pause or stop external logging mechanism.
jadmanski10646442008-08-13 14:05:21 +0000383 """
384 pass
385
386
387 def use_external_logging(self):
mbligh2b92b862008-11-22 13:25:32 +0000388 """
389 Return True if external logging should be used.
jadmanski10646442008-08-13 14:05:21 +0000390 """
391 return False
392
393
mbligh415dc212009-06-15 21:53:34 +0000394 def _make_parallel_wrapper(self, function, machines, log):
395 """Wrap function as appropriate for calling by parallel_simple."""
mbligh2b92b862008-11-22 13:25:32 +0000396 is_forking = not (len(machines) == 1 and self.machines == machines)
mbligh0d0f67d2009-11-06 03:15:03 +0000397 if self._parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000398 def wrapper(machine):
mbligh0d0f67d2009-11-06 03:15:03 +0000399 self._parse_job += "/" + machine
400 self._using_parser = True
jadmanski10646442008-08-13 14:05:21 +0000401 self.machines = [machine]
mbligh0d0f67d2009-11-06 03:15:03 +0000402 self.push_execution_context(machine)
jadmanski609a5f42008-08-26 20:52:42 +0000403 os.chdir(self.resultdir)
showard2bab8f42008-11-12 18:15:22 +0000404 utils.write_keyval(self.resultdir, {"hostname": machine})
mbligh4608b002010-01-05 18:22:35 +0000405 self.init_parser()
jadmanski10646442008-08-13 14:05:21 +0000406 result = function(machine)
407 self.cleanup_parser()
408 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000409 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000410 def wrapper(machine):
mbligh0d0f67d2009-11-06 03:15:03 +0000411 self.push_execution_context(machine)
jadmanski609a5f42008-08-26 20:52:42 +0000412 os.chdir(self.resultdir)
mbligh838d82d2009-03-11 17:14:31 +0000413 machine_data = {'hostname' : machine,
mbligh0d0f67d2009-11-06 03:15:03 +0000414 'status_version' : str(self._STATUS_VERSION)}
mbligh838d82d2009-03-11 17:14:31 +0000415 utils.write_keyval(self.resultdir, machine_data)
jadmanski10646442008-08-13 14:05:21 +0000416 result = function(machine)
417 return result
418 else:
419 wrapper = function
mbligh415dc212009-06-15 21:53:34 +0000420 return wrapper
421
422
423 def parallel_simple(self, function, machines, log=True, timeout=None,
424 return_results=False):
425 """
426 Run 'function' using parallel_simple, with an extra wrapper to handle
427 the necessary setup for continuous parsing, if possible. If continuous
428 parsing is already properly initialized then this should just work.
429
430 @param function: A callable to run in parallel given each machine.
431 @param machines: A list of machine names to be passed one per subcommand
432 invocation of function.
433 @param log: If True, output will be written to output in a subdirectory
434 named after each machine.
435 @param timeout: Seconds after which the function call should timeout.
436 @param return_results: If True instead of an AutoServError being raised
437 on any error a list of the results|exceptions from the function
438 called on each arg is returned. [default: False]
439
440 @raises error.AutotestError: If any of the functions failed.
441 """
442 wrapper = self._make_parallel_wrapper(function, machines, log)
443 return subcommand.parallel_simple(wrapper, machines,
444 log=log, timeout=timeout,
445 return_results=return_results)
446
447
448 def parallel_on_machines(self, function, machines, timeout=None):
449 """
showardcd5fac42009-07-06 20:19:43 +0000450 @param function: Called in parallel with one machine as its argument.
mbligh415dc212009-06-15 21:53:34 +0000451 @param machines: A list of machines to call function(machine) on.
452 @param timeout: Seconds after which the function call should timeout.
453
454 @returns A list of machines on which function(machine) returned
455 without raising an exception.
456 """
showardcd5fac42009-07-06 20:19:43 +0000457 results = self.parallel_simple(function, machines, timeout=timeout,
mbligh415dc212009-06-15 21:53:34 +0000458 return_results=True)
459 success_machines = []
460 for result, machine in itertools.izip(results, machines):
461 if not isinstance(result, Exception):
462 success_machines.append(machine)
463 return success_machines
jadmanski10646442008-08-13 14:05:21 +0000464
465
Paul Pendleburyff1076d2011-03-31 14:45:32 -0700466 def distribute_across_machines(self, tests, machines):
467 """Run each test in tests once using machines.
468
469 Instead of running each test on each machine like parallel_on_machines,
470 run each test once across all machines. Put another way, the total
471 number of tests run by parallel_on_machines is len(tests) *
472 len(machines). The number of tests run by distribute_across_machines is
473 len(tests).
474
475 Args:
476 tests: List of tests to run.
477 machines: list of machines to use.
478 """
479 # The Queue is thread safe, but since a machine may have to search
480 # through the queue to find a valid test the lock provides exclusive
481 # queue access for more than just the get call.
482 test_queue = Queue.Queue()
483 test_queue_lock = threading.Lock()
484
485 machine_workers = [machine_worker(machine, self.resultdir, test_queue,
486 test_queue_lock)
487 for machine in machines]
488
489 # To (potentially) speed up searching for valid tests create a list of
490 # unique attribute sets present in the machines for this job. If sets
491 # were hashable we could just use a dictionary for fast verification.
492 # This at least reduces the search space from the number of machines to
493 # the number of unique machines.
494 unique_machine_attributes = []
495 for mw in machine_workers:
496 if not mw.attribute_set in unique_machine_attributes:
497 unique_machine_attributes.append(mw.attribute_set)
498
499 # Only queue tests which are valid on at least one machine. Record
500 # skipped tests in the status.log file using record_skipped_test().
501 for test_entry in tests:
502 ti = test_item(*test_entry)
503 machine_found = False
504 for ma in unique_machine_attributes:
505 if ti.validate(ma):
506 test_queue.put(ti)
507 machine_found = True
508 break
509 if not machine_found:
510 self.record_skipped_test(ti)
511
512 # Run valid tests and wait for completion.
513 for worker in machine_workers:
514 worker.start()
515 test_queue.join()
516
517
518 def record_skipped_test(self, skipped_test, message=None):
519 """Insert a failure record into status.log for this test."""
520 msg = message
521 if msg is None:
522 msg = 'No valid machines found for test %s.' % skipped_test
523 logging.info(msg)
524 self.record('START', None, skipped_test.test_name)
525 self.record('INFO', None, skipped_test.test_name, msg)
526 self.record('END TEST_NA', None, skipped_test.test_name, msg)
527
528
mbligh0d0f67d2009-11-06 03:15:03 +0000529 _USE_TEMP_DIR = object()
mbligh2b92b862008-11-22 13:25:32 +0000530 def run(self, cleanup=False, install_before=False, install_after=False,
jadmanskie432dd22009-01-30 15:04:51 +0000531 collect_crashdumps=True, namespace={}, control=None,
jadmanskidef0c3c2009-03-25 20:07:10 +0000532 control_file_dir=None, only_collect_crashinfo=False):
jadmanskifb9c0fa2009-04-29 17:39:16 +0000533 # for a normal job, make sure the uncollected logs file exists
534 # for a crashinfo-only run it should already exist, bail out otherwise
jadmanski648c39f2010-03-19 17:38:01 +0000535 created_uncollected_logs = False
mbligh0d0f67d2009-11-06 03:15:03 +0000536 if self.resultdir and not os.path.exists(self._uncollected_log_file):
jadmanskifb9c0fa2009-04-29 17:39:16 +0000537 if only_collect_crashinfo:
538 # if this is a crashinfo-only run, and there were no existing
539 # uncollected logs, just bail out early
540 logging.info("No existing uncollected logs, "
541 "skipping crashinfo collection")
542 return
543 else:
mbligh0d0f67d2009-11-06 03:15:03 +0000544 log_file = open(self._uncollected_log_file, "w")
jadmanskifb9c0fa2009-04-29 17:39:16 +0000545 pickle.dump([], log_file)
546 log_file.close()
jadmanski648c39f2010-03-19 17:38:01 +0000547 created_uncollected_logs = True
jadmanskifb9c0fa2009-04-29 17:39:16 +0000548
jadmanski10646442008-08-13 14:05:21 +0000549 # use a copy so changes don't affect the original dictionary
550 namespace = namespace.copy()
551 machines = self.machines
jadmanskie432dd22009-01-30 15:04:51 +0000552 if control is None:
jadmanski02a3ba22009-11-13 20:47:27 +0000553 if self.control is None:
554 control = ''
555 else:
556 control = self._load_control_file(self.control)
jadmanskie432dd22009-01-30 15:04:51 +0000557 if control_file_dir is None:
558 control_file_dir = self.resultdir
jadmanski10646442008-08-13 14:05:21 +0000559
560 self.aborted = False
561 namespace['machines'] = machines
jadmanski808f4b12010-04-09 22:30:31 +0000562 namespace['args'] = self.args
jadmanski10646442008-08-13 14:05:21 +0000563 namespace['job'] = self
mbligh0d0f67d2009-11-06 03:15:03 +0000564 namespace['ssh_user'] = self._ssh_user
565 namespace['ssh_port'] = self._ssh_port
566 namespace['ssh_pass'] = self._ssh_pass
jadmanski10646442008-08-13 14:05:21 +0000567 test_start_time = int(time.time())
568
mbligh80e1eba2008-11-19 00:26:18 +0000569 if self.resultdir:
570 os.chdir(self.resultdir)
jadmanski779bd292009-03-19 17:33:33 +0000571 # touch status.log so that the parser knows a job is running here
jadmanski382303a2009-04-21 19:53:39 +0000572 open(self.get_status_log_path(), 'a').close()
mbligh80e1eba2008-11-19 00:26:18 +0000573 self.enable_external_logging()
jadmanskie432dd22009-01-30 15:04:51 +0000574
jadmanskicdd0c402008-09-19 21:21:31 +0000575 collect_crashinfo = True
mblighaebe3b62008-12-22 14:45:40 +0000576 temp_control_file_dir = None
jadmanski10646442008-08-13 14:05:21 +0000577 try:
showardcf8d4922009-10-14 16:08:39 +0000578 try:
579 if install_before and machines:
580 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanskie432dd22009-01-30 15:04:51 +0000581
showardcf8d4922009-10-14 16:08:39 +0000582 if only_collect_crashinfo:
583 return
584
jadmanskidef0c3c2009-03-25 20:07:10 +0000585 # determine the dir to write the control files to
586 cfd_specified = (control_file_dir
mbligh0d0f67d2009-11-06 03:15:03 +0000587 and control_file_dir is not self._USE_TEMP_DIR)
jadmanskidef0c3c2009-03-25 20:07:10 +0000588 if cfd_specified:
589 temp_control_file_dir = None
590 else:
591 temp_control_file_dir = tempfile.mkdtemp(
592 suffix='temp_control_file_dir')
593 control_file_dir = temp_control_file_dir
594 server_control_file = os.path.join(control_file_dir,
mblighe0cbc912010-03-11 18:03:07 +0000595 self._control_filename)
jadmanskidef0c3c2009-03-25 20:07:10 +0000596 client_control_file = os.path.join(control_file_dir,
597 CLIENT_CONTROL_FILENAME)
mbligh0d0f67d2009-11-06 03:15:03 +0000598 if self._client:
jadmanskidef0c3c2009-03-25 20:07:10 +0000599 namespace['control'] = control
600 utils.open_write_close(client_control_file, control)
mblighfeac0102009-04-28 18:31:12 +0000601 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
602 server_control_file)
jadmanskidef0c3c2009-03-25 20:07:10 +0000603 else:
604 utils.open_write_close(server_control_file, control)
mbligh26f0d882009-06-22 18:30:01 +0000605 logging.info("Processing control file")
jadmanskidef0c3c2009-03-25 20:07:10 +0000606 self._execute_code(server_control_file, namespace)
mbligh26f0d882009-06-22 18:30:01 +0000607 logging.info("Finished processing control file")
jadmanski10646442008-08-13 14:05:21 +0000608
jadmanskidef0c3c2009-03-25 20:07:10 +0000609 # no error occured, so we don't need to collect crashinfo
610 collect_crashinfo = False
Eric Li6f27d4f2010-09-29 10:55:17 -0700611 except Exception, e:
showardcf8d4922009-10-14 16:08:39 +0000612 try:
613 logging.exception(
614 'Exception escaped control file, job aborting:')
Eric Li6f27d4f2010-09-29 10:55:17 -0700615 self.record('INFO', None, None, str(e),
616 {'job_abort_reason': str(e)})
showardcf8d4922009-10-14 16:08:39 +0000617 except:
618 pass # don't let logging exceptions here interfere
619 raise
jadmanski10646442008-08-13 14:05:21 +0000620 finally:
mblighaebe3b62008-12-22 14:45:40 +0000621 if temp_control_file_dir:
jadmanskie432dd22009-01-30 15:04:51 +0000622 # Clean up temp directory used for copies of the control files
mblighaebe3b62008-12-22 14:45:40 +0000623 try:
624 shutil.rmtree(temp_control_file_dir)
625 except Exception, e:
mblighe7d9c602009-07-02 19:02:33 +0000626 logging.warn('Could not remove temp directory %s: %s',
627 temp_control_file_dir, e)
jadmanskie432dd22009-01-30 15:04:51 +0000628
jadmanskicdd0c402008-09-19 21:21:31 +0000629 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000630 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000631 if collect_crashinfo:
mbligh084bc172008-10-18 14:02:45 +0000632 # includes crashdumps
633 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
jadmanskicdd0c402008-09-19 21:21:31 +0000634 else:
mbligh084bc172008-10-18 14:02:45 +0000635 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
jadmanski648c39f2010-03-19 17:38:01 +0000636 if self._uncollected_log_file and created_uncollected_logs:
mbligh0d0f67d2009-11-06 03:15:03 +0000637 os.remove(self._uncollected_log_file)
jadmanski10646442008-08-13 14:05:21 +0000638 self.disable_external_logging()
showard45ae8192008-11-05 19:32:53 +0000639 if cleanup and machines:
640 self._execute_code(CLEANUP_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000641 if install_after and machines:
mbligh084bc172008-10-18 14:02:45 +0000642 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000643
644
645 def run_test(self, url, *args, **dargs):
mbligh2b92b862008-11-22 13:25:32 +0000646 """
647 Summon a test object and run it.
jadmanski10646442008-08-13 14:05:21 +0000648
649 tag
650 tag to add to testname
651 url
652 url of the test to run
653 """
mblighfc3da5b2010-01-06 18:37:22 +0000654 group, testname = self.pkgmgr.get_package_name(url, 'test')
655 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
656 outputdir = self._make_test_outputdir(subdir)
jadmanski10646442008-08-13 14:05:21 +0000657
658 def group_func():
659 try:
660 test.runtest(self, url, tag, args, dargs)
661 except error.TestBaseException, e:
662 self.record(e.exit_status, subdir, testname, str(e))
663 raise
664 except Exception, e:
665 info = str(e) + "\n" + traceback.format_exc()
666 self.record('FAIL', subdir, testname, info)
667 raise
668 else:
mbligh2b92b862008-11-22 13:25:32 +0000669 self.record('GOOD', subdir, testname, 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000670
671 result, exc_info = self._run_group(testname, subdir, group_func)
672 if exc_info and isinstance(exc_info[1], error.TestBaseException):
673 return False
674 elif exc_info:
675 raise exc_info[0], exc_info[1], exc_info[2]
676 else:
677 return True
jadmanski10646442008-08-13 14:05:21 +0000678
679
680 def _run_group(self, name, subdir, function, *args, **dargs):
681 """\
682 Underlying method for running something inside of a group.
683 """
jadmanskide292df2008-08-26 20:51:14 +0000684 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000685 try:
686 self.record('START', subdir, name)
jadmanski52053632010-06-11 21:08:10 +0000687 result = function(*args, **dargs)
jadmanski10646442008-08-13 14:05:21 +0000688 except error.TestBaseException, e:
jadmanskib88d6dc2009-01-10 00:33:18 +0000689 self.record("END %s" % e.exit_status, subdir, name)
jadmanskide292df2008-08-26 20:51:14 +0000690 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000691 except Exception, e:
692 err_msg = str(e) + '\n'
693 err_msg += traceback.format_exc()
694 self.record('END ABORT', subdir, name, err_msg)
695 raise error.JobError(name + ' failed\n' + traceback.format_exc())
696 else:
697 self.record('END GOOD', subdir, name)
698
jadmanskide292df2008-08-26 20:51:14 +0000699 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000700
701
702 def run_group(self, function, *args, **dargs):
703 """\
704 function:
705 subroutine to run
706 *args:
707 arguments for the function
708 """
709
710 name = function.__name__
711
712 # Allow the tag for the group to be specified.
713 tag = dargs.pop('tag', None)
714 if tag:
715 name = tag
716
jadmanskide292df2008-08-26 20:51:14 +0000717 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000718
719
720 def run_reboot(self, reboot_func, get_kernel_func):
721 """\
722 A specialization of run_group meant specifically for handling
723 a reboot. Includes support for capturing the kernel version
724 after the reboot.
725
726 reboot_func: a function that carries out the reboot
727
728 get_kernel_func: a function that returns a string
729 representing the kernel version.
730 """
jadmanski10646442008-08-13 14:05:21 +0000731 try:
732 self.record('START', None, 'reboot')
jadmanski10646442008-08-13 14:05:21 +0000733 reboot_func()
734 except Exception, e:
jadmanski10646442008-08-13 14:05:21 +0000735 err_msg = str(e) + '\n' + traceback.format_exc()
736 self.record('END FAIL', None, 'reboot', err_msg)
jadmanski4b51d542009-04-08 14:17:16 +0000737 raise
jadmanski10646442008-08-13 14:05:21 +0000738 else:
739 kernel = get_kernel_func()
jadmanski10646442008-08-13 14:05:21 +0000740 self.record('END GOOD', None, 'reboot',
741 optional_fields={"kernel": kernel})
742
743
jadmanskie432dd22009-01-30 15:04:51 +0000744 def run_control(self, path):
745 """Execute a control file found at path (relative to the autotest
746 path). Intended for executing a control file within a control file,
747 not for running the top-level job control file."""
748 path = os.path.join(self.autodir, path)
749 control_file = self._load_control_file(path)
mbligh0d0f67d2009-11-06 03:15:03 +0000750 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
jadmanskie432dd22009-01-30 15:04:51 +0000751
752
jadmanskic09fc152008-10-15 17:56:59 +0000753 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
mbligh4395bbd2009-03-25 19:34:17 +0000754 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
jadmanskic09fc152008-10-15 17:56:59 +0000755 on_every_test)
756
757
758 def add_sysinfo_logfile(self, file, on_every_test=False):
759 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
760
761
762 def _add_sysinfo_loggable(self, loggable, on_every_test):
763 if on_every_test:
764 self.sysinfo.test_loggables.add(loggable)
765 else:
766 self.sysinfo.boot_loggables.add(loggable)
767
768
jadmanski10646442008-08-13 14:05:21 +0000769 def _read_warnings(self):
jadmanskif37df842009-02-11 00:03:26 +0000770 """Poll all the warning loggers and extract any new warnings that have
771 been logged. If the warnings belong to a category that is currently
772 disabled, this method will discard them and they will no longer be
773 retrievable.
774
775 Returns a list of (timestamp, message) tuples, where timestamp is an
776 integer epoch timestamp."""
jadmanski10646442008-08-13 14:05:21 +0000777 warnings = []
778 while True:
779 # pull in a line of output from every logger that has
780 # output ready to be read
mbligh2b92b862008-11-22 13:25:32 +0000781 loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
jadmanski10646442008-08-13 14:05:21 +0000782 closed_loggers = set()
783 for logger in loggers:
784 line = logger.readline()
785 # record any broken pipes (aka line == empty)
786 if len(line) == 0:
787 closed_loggers.add(logger)
788 continue
jadmanskif37df842009-02-11 00:03:26 +0000789 # parse out the warning
790 timestamp, msgtype, msg = line.split('\t', 2)
791 timestamp = int(timestamp)
792 # if the warning is valid, add it to the results
793 if self.warning_manager.is_valid(timestamp, msgtype):
794 warnings.append((timestamp, msg.strip()))
jadmanski10646442008-08-13 14:05:21 +0000795
796 # stop listening to loggers that are closed
797 self.warning_loggers -= closed_loggers
798
799 # stop if none of the loggers have any output left
800 if not loggers:
801 break
802
803 # sort into timestamp order
804 warnings.sort()
805 return warnings
806
807
showardcc929362010-01-25 21:20:41 +0000808 def _unique_subdirectory(self, base_subdirectory_name):
809 """Compute a unique results subdirectory based on the given name.
810
811 Appends base_subdirectory_name with a number as necessary to find a
812 directory name that doesn't already exist.
813 """
814 subdirectory = base_subdirectory_name
815 counter = 1
816 while os.path.exists(os.path.join(self.resultdir, subdirectory)):
817 subdirectory = base_subdirectory_name + '.' + str(counter)
818 counter += 1
819 return subdirectory
820
821
jadmanski52053632010-06-11 21:08:10 +0000822 def get_record_context(self):
823 """Returns an object representing the current job.record context.
824
825 The object returned is an opaque object with a 0-arg restore method
826 which can be called to restore the job.record context (i.e. indentation)
827 to the current level. The intention is that it should be used when
828 something external which generate job.record calls (e.g. an autotest
829 client) can fail catastrophically and the server job record state
830 needs to be reset to its original "known good" state.
831
832 @return: A context object with a 0-arg restore() method."""
833 return self._indenter.get_context()
834
835
showardcc929362010-01-25 21:20:41 +0000836 def record_summary(self, status_code, test_name, reason='', attributes=None,
837 distinguishing_attributes=(), child_test_ids=None):
838 """Record a summary test result.
839
840 @param status_code: status code string, see
841 common_lib.log.is_valid_status()
842 @param test_name: name of the test
843 @param reason: (optional) string providing detailed reason for test
844 outcome
845 @param attributes: (optional) dict of string keyvals to associate with
846 this result
847 @param distinguishing_attributes: (optional) list of attribute names
848 that should be used to distinguish identically-named test
849 results. These attributes should be present in the attributes
850 parameter. This is used to generate user-friendly subdirectory
851 names.
852 @param child_test_ids: (optional) list of test indices for test results
853 used in generating this result.
854 """
855 subdirectory_name_parts = [test_name]
856 for attribute in distinguishing_attributes:
857 assert attributes
858 assert attribute in attributes, '%s not in %s' % (attribute,
859 attributes)
860 subdirectory_name_parts.append(attributes[attribute])
861 base_subdirectory_name = '.'.join(subdirectory_name_parts)
862
863 subdirectory = self._unique_subdirectory(base_subdirectory_name)
864 subdirectory_path = os.path.join(self.resultdir, subdirectory)
865 os.mkdir(subdirectory_path)
866
867 self.record(status_code, subdirectory, test_name,
868 status=reason, optional_fields={'is_summary': True})
869
870 if attributes:
871 utils.write_keyval(subdirectory_path, attributes)
872
873 if child_test_ids:
874 ids_string = ','.join(str(test_id) for test_id in child_test_ids)
875 summary_data = {'child_test_ids': ids_string}
876 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
877 summary_data)
878
879
jadmanski16a7ff72009-04-01 18:19:53 +0000880 def disable_warnings(self, warning_type):
jadmanskif37df842009-02-11 00:03:26 +0000881 self.warning_manager.disable_warnings(warning_type)
jadmanski16a7ff72009-04-01 18:19:53 +0000882 self.record("INFO", None, None,
883 "disabling %s warnings" % warning_type,
884 {"warnings.disable": warning_type})
jadmanskif37df842009-02-11 00:03:26 +0000885
886
jadmanski16a7ff72009-04-01 18:19:53 +0000887 def enable_warnings(self, warning_type):
jadmanskif37df842009-02-11 00:03:26 +0000888 self.warning_manager.enable_warnings(warning_type)
jadmanski16a7ff72009-04-01 18:19:53 +0000889 self.record("INFO", None, None,
890 "enabling %s warnings" % warning_type,
891 {"warnings.enable": warning_type})
jadmanskif37df842009-02-11 00:03:26 +0000892
893
jadmanski779bd292009-03-19 17:33:33 +0000894 def get_status_log_path(self, subdir=None):
895 """Return the path to the job status log.
896
897 @param subdir - Optional paramter indicating that you want the path
898 to a subdirectory status log.
899
900 @returns The path where the status log should be.
901 """
mbligh210bae62009-04-01 18:33:13 +0000902 if self.resultdir:
903 if subdir:
904 return os.path.join(self.resultdir, subdir, "status.log")
905 else:
906 return os.path.join(self.resultdir, "status.log")
jadmanski779bd292009-03-19 17:33:33 +0000907 else:
mbligh210bae62009-04-01 18:33:13 +0000908 return None
jadmanski779bd292009-03-19 17:33:33 +0000909
910
jadmanski6bb32d72009-03-19 20:25:24 +0000911 def _update_uncollected_logs_list(self, update_func):
912 """Updates the uncollected logs list in a multi-process safe manner.
913
914 @param update_func - a function that updates the list of uncollected
915 logs. Should take one parameter, the list to be updated.
916 """
mbligh0d0f67d2009-11-06 03:15:03 +0000917 if self._uncollected_log_file:
918 log_file = open(self._uncollected_log_file, "r+")
mbligha788dc42009-03-26 21:10:16 +0000919 fcntl.flock(log_file, fcntl.LOCK_EX)
jadmanski6bb32d72009-03-19 20:25:24 +0000920 try:
921 uncollected_logs = pickle.load(log_file)
922 update_func(uncollected_logs)
923 log_file.seek(0)
924 log_file.truncate()
925 pickle.dump(uncollected_logs, log_file)
jadmanski3bff9092009-04-22 18:09:47 +0000926 log_file.flush()
jadmanski6bb32d72009-03-19 20:25:24 +0000927 finally:
928 fcntl.flock(log_file, fcntl.LOCK_UN)
929 log_file.close()
930
931
932 def add_client_log(self, hostname, remote_path, local_path):
933 """Adds a new set of client logs to the list of uncollected logs,
934 to allow for future log recovery.
935
936 @param host - the hostname of the machine holding the logs
937 @param remote_path - the directory on the remote machine holding logs
938 @param local_path - the local directory to copy the logs into
939 """
940 def update_func(logs_list):
941 logs_list.append((hostname, remote_path, local_path))
942 self._update_uncollected_logs_list(update_func)
943
944
945 def remove_client_log(self, hostname, remote_path, local_path):
946 """Removes a set of client logs from the list of uncollected logs,
947 to allow for future log recovery.
948
949 @param host - the hostname of the machine holding the logs
950 @param remote_path - the directory on the remote machine holding logs
951 @param local_path - the local directory to copy the logs into
952 """
953 def update_func(logs_list):
954 logs_list.remove((hostname, remote_path, local_path))
955 self._update_uncollected_logs_list(update_func)
956
957
mbligh0d0f67d2009-11-06 03:15:03 +0000958 def get_client_logs(self):
959 """Retrieves the list of uncollected logs, if it exists.
960
961 @returns A list of (host, remote_path, local_path) tuples. Returns
962 an empty list if no uncollected logs file exists.
963 """
964 log_exists = (self._uncollected_log_file and
965 os.path.exists(self._uncollected_log_file))
966 if log_exists:
967 return pickle.load(open(self._uncollected_log_file))
968 else:
969 return []
970
971
mbligh084bc172008-10-18 14:02:45 +0000972 def _fill_server_control_namespace(self, namespace, protect=True):
mbligh2b92b862008-11-22 13:25:32 +0000973 """
974 Prepare a namespace to be used when executing server control files.
mbligh084bc172008-10-18 14:02:45 +0000975
976 This sets up the control file API by importing modules and making them
977 available under the appropriate names within namespace.
978
979 For use by _execute_code().
980
981 Args:
982 namespace: The namespace dictionary to fill in.
983 protect: Boolean. If True (the default) any operation that would
984 clobber an existing entry in namespace will cause an error.
985 Raises:
986 error.AutoservError: When a name would be clobbered by import.
987 """
988 def _import_names(module_name, names=()):
mbligh2b92b862008-11-22 13:25:32 +0000989 """
990 Import a module and assign named attributes into namespace.
mbligh084bc172008-10-18 14:02:45 +0000991
992 Args:
993 module_name: The string module name.
994 names: A limiting list of names to import from module_name. If
995 empty (the default), all names are imported from the module
996 similar to a "from foo.bar import *" statement.
997 Raises:
998 error.AutoservError: When a name being imported would clobber
999 a name already in namespace.
1000 """
1001 module = __import__(module_name, {}, {}, names)
1002
1003 # No names supplied? Import * from the lowest level module.
1004 # (Ugh, why do I have to implement this part myself?)
1005 if not names:
1006 for submodule_name in module_name.split('.')[1:]:
1007 module = getattr(module, submodule_name)
1008 if hasattr(module, '__all__'):
1009 names = getattr(module, '__all__')
1010 else:
1011 names = dir(module)
1012
1013 # Install each name into namespace, checking to make sure it
1014 # doesn't override anything that already exists.
1015 for name in names:
1016 # Check for conflicts to help prevent future problems.
1017 if name in namespace and protect:
1018 if namespace[name] is not getattr(module, name):
1019 raise error.AutoservError('importing name '
1020 '%s from %s %r would override %r' %
1021 (name, module_name, getattr(module, name),
1022 namespace[name]))
1023 else:
1024 # Encourage cleanliness and the use of __all__ for a
1025 # more concrete API with less surprises on '*' imports.
1026 warnings.warn('%s (%r) being imported from %s for use '
1027 'in server control files is not the '
1028 'first occurrance of that import.' %
1029 (name, namespace[name], module_name))
1030
1031 namespace[name] = getattr(module, name)
1032
1033
1034 # This is the equivalent of prepending a bunch of import statements to
1035 # the front of the control script.
mbligha2b07dd2009-06-22 18:26:13 +00001036 namespace.update(os=os, sys=sys, logging=logging)
mbligh084bc172008-10-18 14:02:45 +00001037 _import_names('autotest_lib.server',
1038 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
1039 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
1040 _import_names('autotest_lib.server.subcommand',
1041 ('parallel', 'parallel_simple', 'subcommand'))
1042 _import_names('autotest_lib.server.utils',
1043 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1044 _import_names('autotest_lib.client.common_lib.error')
1045 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1046
1047 # Inject ourself as the job object into other classes within the API.
1048 # (Yuck, this injection is a gross thing be part of a public API. -gps)
1049 #
1050 # XXX Base & SiteAutotest do not appear to use .job. Who does?
1051 namespace['autotest'].Autotest.job = self
1052 # server.hosts.base_classes.Host uses .job.
1053 namespace['hosts'].Host.job = self
Eric Li10222b82010-11-24 09:33:15 -08001054 namespace['hosts'].factory.ssh_user = self._ssh_user
1055 namespace['hosts'].factory.ssh_port = self._ssh_port
1056 namespace['hosts'].factory.ssh_pass = self._ssh_pass
mbligh084bc172008-10-18 14:02:45 +00001057
1058
1059 def _execute_code(self, code_file, namespace, protect=True):
mbligh2b92b862008-11-22 13:25:32 +00001060 """
1061 Execute code using a copy of namespace as a server control script.
mbligh084bc172008-10-18 14:02:45 +00001062
1063 Unless protect_namespace is explicitly set to False, the dict will not
1064 be modified.
1065
1066 Args:
1067 code_file: The filename of the control file to execute.
1068 namespace: A dict containing names to make available during execution.
1069 protect: Boolean. If True (the default) a copy of the namespace dict
1070 is used during execution to prevent the code from modifying its
1071 contents outside of this function. If False the raw dict is
1072 passed in and modifications will be allowed.
1073 """
1074 if protect:
1075 namespace = namespace.copy()
1076 self._fill_server_control_namespace(namespace, protect=protect)
1077 # TODO: Simplify and get rid of the special cases for only 1 machine.
showard3e66e8c2008-10-27 19:20:51 +00001078 if len(self.machines) > 1:
mbligh084bc172008-10-18 14:02:45 +00001079 machines_text = '\n'.join(self.machines) + '\n'
1080 # Only rewrite the file if it does not match our machine list.
1081 try:
1082 machines_f = open(MACHINES_FILENAME, 'r')
1083 existing_machines_text = machines_f.read()
1084 machines_f.close()
1085 except EnvironmentError:
1086 existing_machines_text = None
1087 if machines_text != existing_machines_text:
1088 utils.open_write_close(MACHINES_FILENAME, machines_text)
1089 execfile(code_file, namespace, namespace)
jadmanski10646442008-08-13 14:05:21 +00001090
1091
jadmanskie29d0e42010-06-17 16:06:52 +00001092 def _parse_status(self, new_line):
mbligh0d0f67d2009-11-06 03:15:03 +00001093 if not self._using_parser:
jadmanski10646442008-08-13 14:05:21 +00001094 return
jadmanskie29d0e42010-06-17 16:06:52 +00001095 new_tests = self.parser.process_lines([new_line])
jadmanski10646442008-08-13 14:05:21 +00001096 for test in new_tests:
1097 self.__insert_test(test)
1098
1099
1100 def __insert_test(self, test):
mbligh2b92b862008-11-22 13:25:32 +00001101 """
1102 An internal method to insert a new test result into the
jadmanski10646442008-08-13 14:05:21 +00001103 database. This method will not raise an exception, even if an
1104 error occurs during the insert, to avoid failing a test
1105 simply because of unexpected database issues."""
showard21baa452008-10-21 00:08:39 +00001106 self.num_tests_run += 1
1107 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1108 self.num_tests_failed += 1
jadmanski10646442008-08-13 14:05:21 +00001109 try:
1110 self.results_db.insert_test(self.job_model, test)
1111 except Exception:
1112 msg = ("WARNING: An unexpected error occured while "
1113 "inserting test results into the database. "
1114 "Ignoring error.\n" + traceback.format_exc())
1115 print >> sys.stderr, msg
1116
mblighcaa62c22008-04-07 21:51:17 +00001117
mblighfc3da5b2010-01-06 18:37:22 +00001118 def preprocess_client_state(self):
1119 """
1120 Produce a state file for initializing the state of a client job.
1121
1122 Creates a new client state file with all the current server state, as
1123 well as some pre-set client state.
1124
1125 @returns The path of the file the state was written into.
1126 """
1127 # initialize the sysinfo state
1128 self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1129
1130 # dump the state out to a tempfile
1131 fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1132 os.close(fd)
mbligha2c99492010-01-27 22:59:50 +00001133
1134 # write_to_file doesn't need locking, we exclusively own file_path
mblighfc3da5b2010-01-06 18:37:22 +00001135 self._state.write_to_file(file_path)
1136 return file_path
1137
1138
1139 def postprocess_client_state(self, state_path):
1140 """
1141 Update the state of this job with the state from a client job.
1142
1143 Updates the state of the server side of a job with the final state
1144 of a client job that was run. Updates the non-client-specific state,
1145 pulls in some specific bits from the client-specific state, and then
1146 discards the rest. Removes the state file afterwards
1147
1148 @param state_file A path to the state file from the client.
1149 """
1150 # update the on-disk state
mblighfc3da5b2010-01-06 18:37:22 +00001151 try:
jadmanskib6e7bdb2010-04-13 16:00:39 +00001152 self._state.read_from_file(state_path)
mblighfc3da5b2010-01-06 18:37:22 +00001153 os.remove(state_path)
mbligha2c99492010-01-27 22:59:50 +00001154 except OSError, e:
mblighfc3da5b2010-01-06 18:37:22 +00001155 # ignore file-not-found errors
1156 if e.errno != errno.ENOENT:
1157 raise
jadmanskib6e7bdb2010-04-13 16:00:39 +00001158 else:
1159 logging.debug('Client state file %s not found', state_path)
mblighfc3da5b2010-01-06 18:37:22 +00001160
1161 # update the sysinfo state
1162 if self._state.has('client', 'sysinfo'):
1163 self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1164
1165 # drop all the client-specific state
1166 self._state.discard_namespace('client')
1167
1168
mbligh0a883702010-04-21 01:58:34 +00001169 def clear_all_known_hosts(self):
1170 """Clears known hosts files for all AbstractSSHHosts."""
1171 for host in self.hosts:
1172 if isinstance(host, abstract_ssh.AbstractSSHHost):
1173 host.clear_known_hosts()
1174
1175
mbligha7007722009-01-13 00:37:11 +00001176site_server_job = utils.import_site_class(
1177 __file__, "autotest_lib.server.site_server_job", "site_server_job",
1178 base_server_job)
jadmanski0afbb632008-06-06 21:10:57 +00001179
mbligh0a8c3322009-04-28 18:32:19 +00001180class server_job(site_server_job):
jadmanski0afbb632008-06-06 21:10:57 +00001181 pass
jadmanskif37df842009-02-11 00:03:26 +00001182
1183
1184class warning_manager(object):
1185 """Class for controlling warning logs. Manages the enabling and disabling
1186 of warnings."""
1187 def __init__(self):
1188 # a map of warning types to a list of disabled time intervals
1189 self.disabled_warnings = {}
1190
1191
1192 def is_valid(self, timestamp, warning_type):
1193 """Indicates if a warning (based on the time it occured and its type)
1194 is a valid warning. A warning is considered "invalid" if this type of
1195 warning was marked as "disabled" at the time the warning occured."""
1196 disabled_intervals = self.disabled_warnings.get(warning_type, [])
1197 for start, end in disabled_intervals:
1198 if timestamp >= start and (end is None or timestamp < end):
1199 return False
1200 return True
1201
1202
1203 def disable_warnings(self, warning_type, current_time_func=time.time):
1204 """As of now, disables all further warnings of this type."""
1205 intervals = self.disabled_warnings.setdefault(warning_type, [])
1206 if not intervals or intervals[-1][1] is not None:
jadmanski16a7ff72009-04-01 18:19:53 +00001207 intervals.append((int(current_time_func()), None))
jadmanskif37df842009-02-11 00:03:26 +00001208
1209
1210 def enable_warnings(self, warning_type, current_time_func=time.time):
1211 """As of now, enables all further warnings of this type."""
1212 intervals = self.disabled_warnings.get(warning_type, [])
1213 if intervals and intervals[-1][1] is None:
jadmanski16a7ff72009-04-01 18:19:53 +00001214 intervals[-1] = (intervals[-1][0], int(current_time_func()))
Paul Pendleburyff1076d2011-03-31 14:45:32 -07001215
1216
1217class test_item(object):
1218 """Adds machine verification logic to the basic test tuple.
1219
1220 Tests can either be tuples of the existing form ('testName', {args}) or the
1221 extended for of ('testname', {args}, ['include'], ['exclude']) where include
1222 and exclude are lists of attribues. A machine must have all the attributes
1223 in include and must not have any of the attributes in exclude to be valid
1224 for the test.
1225 """
1226
1227 def __init__(self, test_name, test_args, include_attribs=None,
1228 exclude_attribs=None):
1229 """Creates an instance of test_item.
1230
1231 Args:
1232 test_name: string, name of test to execute.
1233 test_args: dictionary, arguments to pass into test.
1234 include_attribs: attributes a machine must have to run test.
1235 exclude_attribs: attributes preventing a machine from running test.
1236 """
1237 self.test_name = test_name
1238 self.test_args = test_args
1239 self.inc_set = None
1240 if include_attribs is not None:
1241 self.inc_set = set(include_attribs)
1242 self.exc_set = None
1243 if exclude_attribs is not None:
1244 self.exc_set = set(exclude_attribs)
1245
1246 def __str__(self):
1247 """Return an info string of this test."""
1248 params = ['%s=%s' % (k, v) for k, v in self.test_args.items()]
1249 msg = '%s(%s)' % (self.test_name, params)
1250 if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set]
1251 if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set]
1252 return msg
1253
1254 def validate(self, machine_attributes):
1255 """Check if this test can run on machine with machine_attributes.
1256
1257 If the test has include attributes, a candidate machine must have all
1258 the attributes to be valid.
1259
1260 If the test has exclude attributes, a candidate machine cannot have any
1261 of the attributes to be valid.
1262
1263 Args:
1264 machine_attributes: set, True attributes of candidate machine.
1265
1266 Returns:
1267 True/False if the machine is valid for this test.
1268 """
1269 if self.inc_set is not None:
1270 if not self.inc_set <= machine_attributes: return False
1271 if self.exc_set is not None:
1272 if self.exc_set & machine_attributes: return False
1273 return True
1274
1275
1276class machine_worker(threading.Thread):
1277 """Thread that runs tests on a remote host machine."""
1278
1279 def __init__(self, machine, work_dir, test_queue, queue_lock):
1280 """Creates an instance of machine_worker to run tests on a remote host.
1281
1282 Retrieves that host attributes for this machine and creates the set of
1283 True attributes to validate against test include/exclude attributes.
1284
1285 Creates a directory to hold the log files for tests run and writes the
1286 hostname and tko parser version into keyvals file.
1287
1288 Args:
1289 machine: name of remote host.
1290 work_dir: directory server job is using.
1291 test_queue: queue of tests.
1292 queue_lock: lock protecting test_queue.
1293 """
1294 threading.Thread.__init__(self)
1295 self._test_queue = test_queue
1296 self._test_queue_lock = queue_lock
1297 self._tests_run = 0
1298 self._machine = machine
1299 self._host = hosts.create_host(self._machine)
1300 self._client_at = autotest.Autotest(self._host)
1301 client_attributes = site_host_attributes.HostAttributes(machine)
1302 self.attribute_set = set([key for key, value in
1303 client_attributes.__dict__.items() if value])
1304 self._results_dir = os.path.join(work_dir, self._machine)
1305 if not os.path.exists(self._results_dir):
1306 os.makedirs(self._results_dir)
1307 machine_data = {'hostname': self._machine,
1308 'status_version': str(1)}
1309 utils.write_keyval(self._results_dir, machine_data)
1310
1311 def __str__(self):
1312 attributes = [a for a in self.attribute_set]
1313 return '%s attributes=%s' % (self._machine, attributes)
1314
1315 def get_test(self):
1316 """Return a test from the queue to run on this host.
1317
1318 The test queue can be non-empty, but still not contain a test that is
1319 valid for this machine. This function will take exclusive access to
1320 the queue via _test_queue_lock and repeatedly pop tests off the queue
1321 until finding a valid test or depleting the queue. In either case if
1322 invalid tests have been popped from the queue, they are pushed back
1323 onto the queue before returning.
1324
1325 Returns:
1326 test_item, or None if no more tests exist for this machine.
1327 """
1328 good_test = None
1329 skipped_tests = []
1330
1331 with self._test_queue_lock:
1332 while True:
1333 try:
1334 canidate_test = self._test_queue.get_nowait()
1335 # Check if test is valid for this machine.
1336 if canidate_test.validate(self.attribute_set):
1337 good_test = canidate_test
1338 break
1339 skipped_tests.append(canidate_test)
1340
1341 except Queue.Empty:
1342 break
1343
1344 # Return any skipped tests to the queue.
1345 for st in skipped_tests:
1346 self._test_queue.put(st)
1347
1348 return good_test
1349
1350 def run_subcommand(self, active_test):
1351 """Use subcommand to fork process and execute test."""
1352 sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test])
1353 sub_cmd.fork_start()
1354 sub_cmd.fork_waitfor()
1355
1356 def subcommand_wrapper(self, active_test):
1357 """Callback for subcommand to call into with the test parameter."""
1358 self._client_at.run_test(active_test.test_name,
1359 results_dir=self._results_dir,
1360 **active_test.test_args)
1361
1362 def run(self):
1363 """Executes tests on host machine.
1364
1365 Uses subprocess to fork the process when running tests so unique client
1366 jobs talk to unique server jobs which prevents log files from
1367 simlutaneous tests interweaving with each other.
1368 """
1369 while True:
1370 active_test = self.get_test()
1371 if active_test is None:
1372 break
1373
1374 logging.info('%s running %s', self._machine, active_test)
1375 try:
1376 self.run_subcommand(active_test)
1377 except (error.AutoservError, error.AutotestError):
1378 logging.exception('Error running test "%s".', active_test)
1379 except Exception:
1380 logging.exception('Exception running test "%s".', active_test)
1381 raise
1382 finally:
1383 self._test_queue.task_done()
1384 self._tests_run += 1
1385
1386 logging.info('%s completed %d tests.', self._machine, self._tests_run)