blob: 6b98dd28d658508b2d957205108275371a9dcee9 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
Eric Li861b2d52011-02-04 14:50:35 -08009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform
Paul Pendleburyff1076d2011-03-31 14:45:32 -070010import Queue, threading
mblighfc3da5b2010-01-06 18:37:22 +000011import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
showard75cdfee2009-06-10 17:40:41 +000012from autotest_lib.client.bin import sysinfo
mbligh0d0f67d2009-11-06 03:15:03 +000013from autotest_lib.client.common_lib import base_job
mbligh09108442008-10-15 16:27:38 +000014from autotest_lib.client.common_lib import error, log, utils, packages
showard75cdfee2009-06-10 17:40:41 +000015from autotest_lib.client.common_lib import logging_manager
Paul Pendleburyf807c182011-04-05 11:24:34 -070016from autotest_lib.server import test, subcommand, profilers, server_job_utils
mbligh0a883702010-04-21 01:58:34 +000017from autotest_lib.server.hosts import abstract_ssh
jadmanski10646442008-08-13 14:05:21 +000018from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000019
20
mbligh084bc172008-10-18 14:02:45 +000021def _control_segment_path(name):
22 """Get the pathname of the named control segment file."""
jadmanski10646442008-08-13 14:05:21 +000023 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh084bc172008-10-18 14:02:45 +000024 return os.path.join(server_dir, "control_segments", name)
jadmanski10646442008-08-13 14:05:21 +000025
26
mbligh084bc172008-10-18 14:02:45 +000027CLIENT_CONTROL_FILENAME = 'control'
28SERVER_CONTROL_FILENAME = 'control.srv'
29MACHINES_FILENAME = '.machines'
jadmanski10646442008-08-13 14:05:21 +000030
mbligh084bc172008-10-18 14:02:45 +000031CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
32CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
33CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
mbligh084bc172008-10-18 14:02:45 +000034INSTALL_CONTROL_FILE = _control_segment_path('install')
showard45ae8192008-11-05 19:32:53 +000035CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
jadmanski10646442008-08-13 14:05:21 +000036
mbligh084bc172008-10-18 14:02:45 +000037VERIFY_CONTROL_FILE = _control_segment_path('verify')
mbligh084bc172008-10-18 14:02:45 +000038REPAIR_CONTROL_FILE = _control_segment_path('repair')
jadmanski10646442008-08-13 14:05:21 +000039
40
mbligh062ed152009-01-13 00:57:14 +000041# by default provide a stub that generates no site data
42def _get_site_job_data_dummy(job):
43 return {}
44
45
jadmanski10646442008-08-13 14:05:21 +000046# load up site-specific code for generating site-specific job data
mbligh062ed152009-01-13 00:57:14 +000047get_site_job_data = utils.import_site_function(__file__,
jadmanskic0a623d2009-03-03 21:11:48 +000048 "autotest_lib.server.site_server_job", "get_site_job_data",
mbligh062ed152009-01-13 00:57:14 +000049 _get_site_job_data_dummy)
jadmanski10646442008-08-13 14:05:21 +000050
51
jadmanski2a89dac2010-06-11 14:32:58 +000052class status_indenter(base_job.status_indenter):
53 """Provide a simple integer-backed status indenter."""
54 def __init__(self):
55 self._indent = 0
56
57
58 @property
59 def indent(self):
60 return self._indent
61
62
63 def increment(self):
64 self._indent += 1
65
66
67 def decrement(self):
68 self._indent -= 1
69
70
jadmanski52053632010-06-11 21:08:10 +000071 def get_context(self):
72 """Returns a context object for use by job.get_record_context."""
73 class context(object):
74 def __init__(self, indenter, indent):
75 self._indenter = indenter
76 self._indent = indent
77 def restore(self):
78 self._indenter._indent = self._indent
79 return context(self, self._indent)
80
81
jadmanski2a89dac2010-06-11 14:32:58 +000082class server_job_record_hook(object):
83 """The job.record hook for server job. Used to inject WARN messages from
84 the console or vlm whenever new logs are written, and to echo any logs
85 to INFO level logging. Implemented as a class so that it can use state to
86 block recursive calls, so that the hook can call job.record itself to
87 log WARN messages.
88
89 Depends on job._read_warnings and job._logger.
90 """
91 def __init__(self, job):
92 self._job = job
93 self._being_called = False
94
95
96 def __call__(self, entry):
97 """A wrapper around the 'real' record hook, the _hook method, which
98 prevents recursion. This isn't making any effort to be threadsafe,
99 the intent is to outright block infinite recursion via a
100 job.record->_hook->job.record->_hook->job.record... chain."""
101 if self._being_called:
102 return
103 self._being_called = True
104 try:
105 self._hook(self._job, entry)
106 finally:
107 self._being_called = False
108
109
110 @staticmethod
111 def _hook(job, entry):
112 """The core hook, which can safely call job.record."""
113 entries = []
114 # poll all our warning loggers for new warnings
115 for timestamp, msg in job._read_warnings():
116 warning_entry = base_job.status_log_entry(
117 'WARN', None, None, msg, {}, timestamp=timestamp)
118 entries.append(warning_entry)
119 job.record_entry(warning_entry)
120 # echo rendered versions of all the status logs to info
121 entries.append(entry)
122 for entry in entries:
123 rendered_entry = job._logger.render_entry(entry)
124 logging.info(rendered_entry)
jadmanskie29d0e42010-06-17 16:06:52 +0000125 job._parse_status(rendered_entry)
jadmanski2a89dac2010-06-11 14:32:58 +0000126
127
mbligh0d0f67d2009-11-06 03:15:03 +0000128class base_server_job(base_job.base_job):
129 """The server-side concrete implementation of base_job.
jadmanski10646442008-08-13 14:05:21 +0000130
mbligh0d0f67d2009-11-06 03:15:03 +0000131 Optional properties provided by this implementation:
132 serverdir
133 conmuxdir
134
135 num_tests_run
136 num_tests_failed
137
138 warning_manager
139 warning_loggers
jadmanski10646442008-08-13 14:05:21 +0000140 """
141
mbligh0d0f67d2009-11-06 03:15:03 +0000142 _STATUS_VERSION = 1
jadmanski10646442008-08-13 14:05:21 +0000143
144 def __init__(self, control, args, resultdir, label, user, machines,
145 client=False, parse_job='',
mbligh374f3412009-05-13 21:29:45 +0000146 ssh_user='root', ssh_port=22, ssh_pass='',
mblighe0cbc912010-03-11 18:03:07 +0000147 group_name='', tag='',
148 control_filename=SERVER_CONTROL_FILENAME):
jadmanski10646442008-08-13 14:05:21 +0000149 """
mbligh374f3412009-05-13 21:29:45 +0000150 Create a server side job object.
mblighb5dac432008-11-27 00:38:44 +0000151
mblighe7d9c602009-07-02 19:02:33 +0000152 @param control: The pathname of the control file.
153 @param args: Passed to the control file.
154 @param resultdir: Where to throw the results.
155 @param label: Description of the job.
156 @param user: Username for the job (email address).
157 @param client: True if this is a client-side control file.
158 @param parse_job: string, if supplied it is the job execution tag that
159 the results will be passed through to the TKO parser with.
160 @param ssh_user: The SSH username. [root]
161 @param ssh_port: The SSH port number. [22]
162 @param ssh_pass: The SSH passphrase, if needed.
163 @param group_name: If supplied, this will be written out as
mbligh374f3412009-05-13 21:29:45 +0000164 host_group_name in the keyvals file for the parser.
mblighe7d9c602009-07-02 19:02:33 +0000165 @param tag: The job execution tag from the scheduler. [optional]
mblighe0cbc912010-03-11 18:03:07 +0000166 @param control_filename: The filename where the server control file
167 should be written in the results directory.
jadmanski10646442008-08-13 14:05:21 +0000168 """
mbligh0d0f67d2009-11-06 03:15:03 +0000169 super(base_server_job, self).__init__(resultdir=resultdir)
mbligha788dc42009-03-26 21:10:16 +0000170
mbligh0d0f67d2009-11-06 03:15:03 +0000171 path = os.path.dirname(__file__)
172 self.control = control
173 self._uncollected_log_file = os.path.join(self.resultdir,
174 'uncollected_logs')
175 debugdir = os.path.join(self.resultdir, 'debug')
176 if not os.path.exists(debugdir):
177 os.mkdir(debugdir)
178
179 if user:
180 self.user = user
181 else:
182 self.user = getpass.getuser()
183
jadmanski808f4b12010-04-09 22:30:31 +0000184 self.args = args
jadmanski10646442008-08-13 14:05:21 +0000185 self.machines = machines
mbligh0d0f67d2009-11-06 03:15:03 +0000186 self._client = client
jadmanski10646442008-08-13 14:05:21 +0000187 self.warning_loggers = set()
jadmanskif37df842009-02-11 00:03:26 +0000188 self.warning_manager = warning_manager()
mbligh0d0f67d2009-11-06 03:15:03 +0000189 self._ssh_user = ssh_user
190 self._ssh_port = ssh_port
191 self._ssh_pass = ssh_pass
mblighe7d9c602009-07-02 19:02:33 +0000192 self.tag = tag
mbligh09108442008-10-15 16:27:38 +0000193 self.last_boot_tag = None
jadmanski53aaf382008-11-17 16:22:31 +0000194 self.hosts = set()
mbligh0d0f67d2009-11-06 03:15:03 +0000195 self.drop_caches = False
mblighb5dac432008-11-27 00:38:44 +0000196 self.drop_caches_between_iterations = False
mblighe0cbc912010-03-11 18:03:07 +0000197 self._control_filename = control_filename
jadmanski10646442008-08-13 14:05:21 +0000198
showard75cdfee2009-06-10 17:40:41 +0000199 self.logging = logging_manager.get_logging_manager(
200 manage_stdout_and_stderr=True, redirect_fds=True)
201 subcommand.logging_manager_object = self.logging
jadmanski10646442008-08-13 14:05:21 +0000202
mbligh0d0f67d2009-11-06 03:15:03 +0000203 self.sysinfo = sysinfo.sysinfo(self.resultdir)
jadmanski043e1132008-11-19 17:10:32 +0000204 self.profilers = profilers.profilers(self)
jadmanskic09fc152008-10-15 17:56:59 +0000205
jadmanski10646442008-08-13 14:05:21 +0000206 job_data = {'label' : label, 'user' : user,
207 'hostname' : ','.join(machines),
Eric Li861b2d52011-02-04 14:50:35 -0800208 'drone' : platform.node(),
mbligh0d0f67d2009-11-06 03:15:03 +0000209 'status_version' : str(self._STATUS_VERSION),
showard170873e2009-01-07 00:22:26 +0000210 'job_started' : str(int(time.time()))}
mbligh374f3412009-05-13 21:29:45 +0000211 if group_name:
212 job_data['host_group_name'] = group_name
jadmanski10646442008-08-13 14:05:21 +0000213
mbligh0d0f67d2009-11-06 03:15:03 +0000214 # only write these keyvals out on the first job in a resultdir
215 if 'job_started' not in utils.read_keyval(self.resultdir):
216 job_data.update(get_site_job_data(self))
217 utils.write_keyval(self.resultdir, job_data)
218
219 self._parse_job = parse_job
showardcc929362010-01-25 21:20:41 +0000220 self._using_parser = (self._parse_job and len(machines) <= 1)
mbligh0d0f67d2009-11-06 03:15:03 +0000221 self.pkgmgr = packages.PackageManager(
222 self.autodir, run_function_dargs={'timeout':600})
showard21baa452008-10-21 00:08:39 +0000223 self.num_tests_run = 0
224 self.num_tests_failed = 0
225
jadmanski550fdc22008-11-20 16:32:08 +0000226 self._register_subcommand_hooks()
227
mbligh0d0f67d2009-11-06 03:15:03 +0000228 # these components aren't usable on the server
229 self.bootloader = None
230 self.harness = None
231
jadmanski2a89dac2010-06-11 14:32:58 +0000232 # set up the status logger
jadmanski52053632010-06-11 21:08:10 +0000233 self._indenter = status_indenter()
jadmanski2a89dac2010-06-11 14:32:58 +0000234 self._logger = base_job.status_logger(
jadmanski52053632010-06-11 21:08:10 +0000235 self, self._indenter, 'status.log', 'status.log',
jadmanski2a89dac2010-06-11 14:32:58 +0000236 record_hook=server_job_record_hook(self))
237
mbligh0d0f67d2009-11-06 03:15:03 +0000238
239 @classmethod
240 def _find_base_directories(cls):
241 """
242 Determine locations of autodir, clientdir and serverdir. Assumes
243 that this file is located within serverdir and uses __file__ along
244 with relative paths to resolve the location.
245 """
246 serverdir = os.path.abspath(os.path.dirname(__file__))
247 autodir = os.path.normpath(os.path.join(serverdir, '..'))
248 clientdir = os.path.join(autodir, 'client')
249 return autodir, clientdir, serverdir
250
251
252 def _find_resultdir(self, resultdir):
253 """
254 Determine the location of resultdir. For server jobs we expect one to
255 always be explicitly passed in to __init__, so just return that.
256 """
257 if resultdir:
258 return os.path.normpath(resultdir)
259 else:
260 return None
261
jadmanski550fdc22008-11-20 16:32:08 +0000262
jadmanski2a89dac2010-06-11 14:32:58 +0000263 def _get_status_logger(self):
264 """Return a reference to the status logger."""
265 return self._logger
266
267
jadmanskie432dd22009-01-30 15:04:51 +0000268 @staticmethod
269 def _load_control_file(path):
270 f = open(path)
271 try:
272 control_file = f.read()
273 finally:
274 f.close()
275 return re.sub('\r', '', control_file)
276
277
jadmanski550fdc22008-11-20 16:32:08 +0000278 def _register_subcommand_hooks(self):
mbligh2b92b862008-11-22 13:25:32 +0000279 """
280 Register some hooks into the subcommand modules that allow us
281 to properly clean up self.hosts created in forked subprocesses.
282 """
jadmanski550fdc22008-11-20 16:32:08 +0000283 def on_fork(cmd):
284 self._existing_hosts_on_fork = set(self.hosts)
285 def on_join(cmd):
286 new_hosts = self.hosts - self._existing_hosts_on_fork
287 for host in new_hosts:
288 host.close()
289 subcommand.subcommand.register_fork_hook(on_fork)
290 subcommand.subcommand.register_join_hook(on_join)
291
jadmanski10646442008-08-13 14:05:21 +0000292
mbligh4608b002010-01-05 18:22:35 +0000293 def init_parser(self):
mbligh2b92b862008-11-22 13:25:32 +0000294 """
mbligh4608b002010-01-05 18:22:35 +0000295 Start the continuous parsing of self.resultdir. This sets up
jadmanski10646442008-08-13 14:05:21 +0000296 the database connection and inserts the basic job object into
mbligh2b92b862008-11-22 13:25:32 +0000297 the database if necessary.
298 """
mbligh4608b002010-01-05 18:22:35 +0000299 if not self._using_parser:
300 return
jadmanski10646442008-08-13 14:05:21 +0000301 # redirect parser debugging to .parse.log
mbligh4608b002010-01-05 18:22:35 +0000302 parse_log = os.path.join(self.resultdir, '.parse.log')
jadmanski10646442008-08-13 14:05:21 +0000303 parse_log = open(parse_log, 'w', 0)
304 tko_utils.redirect_parser_debugging(parse_log)
305 # create a job model object and set up the db
306 self.results_db = tko_db.db(autocommit=True)
mbligh0d0f67d2009-11-06 03:15:03 +0000307 self.parser = status_lib.parser(self._STATUS_VERSION)
mbligh4608b002010-01-05 18:22:35 +0000308 self.job_model = self.parser.make_job(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000309 self.parser.start(self.job_model)
310 # check if a job already exists in the db and insert it if
311 # it does not
mbligh0d0f67d2009-11-06 03:15:03 +0000312 job_idx = self.results_db.find_job(self._parse_job)
jadmanski10646442008-08-13 14:05:21 +0000313 if job_idx is None:
mbligh0d0f67d2009-11-06 03:15:03 +0000314 self.results_db.insert_job(self._parse_job, self.job_model)
jadmanski10646442008-08-13 14:05:21 +0000315 else:
mbligh2b92b862008-11-22 13:25:32 +0000316 machine_idx = self.results_db.lookup_machine(self.job_model.machine)
jadmanski10646442008-08-13 14:05:21 +0000317 self.job_model.index = job_idx
318 self.job_model.machine_idx = machine_idx
319
320
321 def cleanup_parser(self):
mbligh2b92b862008-11-22 13:25:32 +0000322 """
323 This should be called after the server job is finished
jadmanski10646442008-08-13 14:05:21 +0000324 to carry out any remaining cleanup (e.g. flushing any
mbligh2b92b862008-11-22 13:25:32 +0000325 remaining test results to the results db)
326 """
mbligh0d0f67d2009-11-06 03:15:03 +0000327 if not self._using_parser:
jadmanski10646442008-08-13 14:05:21 +0000328 return
329 final_tests = self.parser.end()
330 for test in final_tests:
331 self.__insert_test(test)
mbligh0d0f67d2009-11-06 03:15:03 +0000332 self._using_parser = False
jadmanski10646442008-08-13 14:05:21 +0000333
334
335 def verify(self):
336 if not self.machines:
mbligh084bc172008-10-18 14:02:45 +0000337 raise error.AutoservError('No machines specified to verify')
mbligh0fce4112008-11-27 00:37:17 +0000338 if self.resultdir:
339 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000340 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000341 namespace = {'machines' : self.machines, 'job' : self,
mbligh0d0f67d2009-11-06 03:15:03 +0000342 'ssh_user' : self._ssh_user,
343 'ssh_port' : self._ssh_port,
344 'ssh_pass' : self._ssh_pass}
mbligh084bc172008-10-18 14:02:45 +0000345 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000346 except Exception, e:
mbligh2b92b862008-11-22 13:25:32 +0000347 msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc())
jadmanski10646442008-08-13 14:05:21 +0000348 self.record('ABORT', None, None, msg)
349 raise
350
351
352 def repair(self, host_protection):
353 if not self.machines:
354 raise error.AutoservError('No machines specified to repair')
mbligh0fce4112008-11-27 00:37:17 +0000355 if self.resultdir:
356 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000357 namespace = {'machines': self.machines, 'job': self,
mbligh0d0f67d2009-11-06 03:15:03 +0000358 'ssh_user': self._ssh_user, 'ssh_port': self._ssh_port,
359 'ssh_pass': self._ssh_pass,
jadmanski10646442008-08-13 14:05:21 +0000360 'protection_level': host_protection}
mbligh25c0b8c2009-01-24 01:44:17 +0000361
mbligh0931b0a2009-04-08 17:44:48 +0000362 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000363
364
365 def precheck(self):
366 """
367 perform any additional checks in derived classes.
368 """
369 pass
370
371
372 def enable_external_logging(self):
mbligh2b92b862008-11-22 13:25:32 +0000373 """
374 Start or restart external logging mechanism.
jadmanski10646442008-08-13 14:05:21 +0000375 """
376 pass
377
378
379 def disable_external_logging(self):
mbligh2b92b862008-11-22 13:25:32 +0000380 """
381 Pause or stop external logging mechanism.
jadmanski10646442008-08-13 14:05:21 +0000382 """
383 pass
384
385
386 def use_external_logging(self):
mbligh2b92b862008-11-22 13:25:32 +0000387 """
388 Return True if external logging should be used.
jadmanski10646442008-08-13 14:05:21 +0000389 """
390 return False
391
392
mbligh415dc212009-06-15 21:53:34 +0000393 def _make_parallel_wrapper(self, function, machines, log):
394 """Wrap function as appropriate for calling by parallel_simple."""
mbligh2b92b862008-11-22 13:25:32 +0000395 is_forking = not (len(machines) == 1 and self.machines == machines)
mbligh0d0f67d2009-11-06 03:15:03 +0000396 if self._parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000397 def wrapper(machine):
mbligh0d0f67d2009-11-06 03:15:03 +0000398 self._parse_job += "/" + machine
399 self._using_parser = True
jadmanski10646442008-08-13 14:05:21 +0000400 self.machines = [machine]
mbligh0d0f67d2009-11-06 03:15:03 +0000401 self.push_execution_context(machine)
jadmanski609a5f42008-08-26 20:52:42 +0000402 os.chdir(self.resultdir)
showard2bab8f42008-11-12 18:15:22 +0000403 utils.write_keyval(self.resultdir, {"hostname": machine})
mbligh4608b002010-01-05 18:22:35 +0000404 self.init_parser()
jadmanski10646442008-08-13 14:05:21 +0000405 result = function(machine)
406 self.cleanup_parser()
407 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000408 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000409 def wrapper(machine):
mbligh0d0f67d2009-11-06 03:15:03 +0000410 self.push_execution_context(machine)
jadmanski609a5f42008-08-26 20:52:42 +0000411 os.chdir(self.resultdir)
mbligh838d82d2009-03-11 17:14:31 +0000412 machine_data = {'hostname' : machine,
mbligh0d0f67d2009-11-06 03:15:03 +0000413 'status_version' : str(self._STATUS_VERSION)}
mbligh838d82d2009-03-11 17:14:31 +0000414 utils.write_keyval(self.resultdir, machine_data)
jadmanski10646442008-08-13 14:05:21 +0000415 result = function(machine)
416 return result
417 else:
418 wrapper = function
mbligh415dc212009-06-15 21:53:34 +0000419 return wrapper
420
421
422 def parallel_simple(self, function, machines, log=True, timeout=None,
423 return_results=False):
424 """
425 Run 'function' using parallel_simple, with an extra wrapper to handle
426 the necessary setup for continuous parsing, if possible. If continuous
427 parsing is already properly initialized then this should just work.
428
429 @param function: A callable to run in parallel given each machine.
430 @param machines: A list of machine names to be passed one per subcommand
431 invocation of function.
432 @param log: If True, output will be written to output in a subdirectory
433 named after each machine.
434 @param timeout: Seconds after which the function call should timeout.
435 @param return_results: If True instead of an AutoServError being raised
436 on any error a list of the results|exceptions from the function
437 called on each arg is returned. [default: False]
438
439 @raises error.AutotestError: If any of the functions failed.
440 """
441 wrapper = self._make_parallel_wrapper(function, machines, log)
442 return subcommand.parallel_simple(wrapper, machines,
443 log=log, timeout=timeout,
444 return_results=return_results)
445
446
447 def parallel_on_machines(self, function, machines, timeout=None):
448 """
showardcd5fac42009-07-06 20:19:43 +0000449 @param function: Called in parallel with one machine as its argument.
mbligh415dc212009-06-15 21:53:34 +0000450 @param machines: A list of machines to call function(machine) on.
451 @param timeout: Seconds after which the function call should timeout.
452
453 @returns A list of machines on which function(machine) returned
454 without raising an exception.
455 """
showardcd5fac42009-07-06 20:19:43 +0000456 results = self.parallel_simple(function, machines, timeout=timeout,
mbligh415dc212009-06-15 21:53:34 +0000457 return_results=True)
458 success_machines = []
459 for result, machine in itertools.izip(results, machines):
460 if not isinstance(result, Exception):
461 success_machines.append(machine)
462 return success_machines
jadmanski10646442008-08-13 14:05:21 +0000463
464
Paul Pendleburyff1076d2011-03-31 14:45:32 -0700465 def distribute_across_machines(self, tests, machines):
466 """Run each test in tests once using machines.
467
468 Instead of running each test on each machine like parallel_on_machines,
469 run each test once across all machines. Put another way, the total
470 number of tests run by parallel_on_machines is len(tests) *
471 len(machines). The number of tests run by distribute_across_machines is
472 len(tests).
473
474 Args:
475 tests: List of tests to run.
476 machines: list of machines to use.
477 """
478 # The Queue is thread safe, but since a machine may have to search
479 # through the queue to find a valid test the lock provides exclusive
480 # queue access for more than just the get call.
481 test_queue = Queue.Queue()
482 test_queue_lock = threading.Lock()
483
Paul Pendlebury1f6f3e72011-04-13 11:16:44 -0700484 machine_workers = [server_job_utils.machine_worker(self,
485 machine,
Paul Pendleburyf807c182011-04-05 11:24:34 -0700486 self.resultdir,
487 test_queue,
488 test_queue_lock)
Paul Pendleburyff1076d2011-03-31 14:45:32 -0700489 for machine in machines]
490
491 # To (potentially) speed up searching for valid tests create a list of
492 # unique attribute sets present in the machines for this job. If sets
493 # were hashable we could just use a dictionary for fast verification.
494 # This at least reduces the search space from the number of machines to
495 # the number of unique machines.
496 unique_machine_attributes = []
497 for mw in machine_workers:
498 if not mw.attribute_set in unique_machine_attributes:
499 unique_machine_attributes.append(mw.attribute_set)
500
501 # Only queue tests which are valid on at least one machine. Record
502 # skipped tests in the status.log file using record_skipped_test().
503 for test_entry in tests:
Paul Pendleburyf807c182011-04-05 11:24:34 -0700504 ti = server_job_utils.test_item(*test_entry)
Paul Pendleburyff1076d2011-03-31 14:45:32 -0700505 machine_found = False
506 for ma in unique_machine_attributes:
507 if ti.validate(ma):
508 test_queue.put(ti)
509 machine_found = True
510 break
511 if not machine_found:
512 self.record_skipped_test(ti)
513
514 # Run valid tests and wait for completion.
515 for worker in machine_workers:
516 worker.start()
517 test_queue.join()
518
519
520 def record_skipped_test(self, skipped_test, message=None):
521 """Insert a failure record into status.log for this test."""
522 msg = message
523 if msg is None:
524 msg = 'No valid machines found for test %s.' % skipped_test
525 logging.info(msg)
526 self.record('START', None, skipped_test.test_name)
527 self.record('INFO', None, skipped_test.test_name, msg)
528 self.record('END TEST_NA', None, skipped_test.test_name, msg)
529
530
mbligh0d0f67d2009-11-06 03:15:03 +0000531 _USE_TEMP_DIR = object()
mbligh2b92b862008-11-22 13:25:32 +0000532 def run(self, cleanup=False, install_before=False, install_after=False,
jadmanskie432dd22009-01-30 15:04:51 +0000533 collect_crashdumps=True, namespace={}, control=None,
jadmanskidef0c3c2009-03-25 20:07:10 +0000534 control_file_dir=None, only_collect_crashinfo=False):
jadmanskifb9c0fa2009-04-29 17:39:16 +0000535 # for a normal job, make sure the uncollected logs file exists
536 # for a crashinfo-only run it should already exist, bail out otherwise
jadmanski648c39f2010-03-19 17:38:01 +0000537 created_uncollected_logs = False
mbligh0d0f67d2009-11-06 03:15:03 +0000538 if self.resultdir and not os.path.exists(self._uncollected_log_file):
jadmanskifb9c0fa2009-04-29 17:39:16 +0000539 if only_collect_crashinfo:
540 # if this is a crashinfo-only run, and there were no existing
541 # uncollected logs, just bail out early
542 logging.info("No existing uncollected logs, "
543 "skipping crashinfo collection")
544 return
545 else:
mbligh0d0f67d2009-11-06 03:15:03 +0000546 log_file = open(self._uncollected_log_file, "w")
jadmanskifb9c0fa2009-04-29 17:39:16 +0000547 pickle.dump([], log_file)
548 log_file.close()
jadmanski648c39f2010-03-19 17:38:01 +0000549 created_uncollected_logs = True
jadmanskifb9c0fa2009-04-29 17:39:16 +0000550
jadmanski10646442008-08-13 14:05:21 +0000551 # use a copy so changes don't affect the original dictionary
552 namespace = namespace.copy()
553 machines = self.machines
jadmanskie432dd22009-01-30 15:04:51 +0000554 if control is None:
jadmanski02a3ba22009-11-13 20:47:27 +0000555 if self.control is None:
556 control = ''
557 else:
558 control = self._load_control_file(self.control)
jadmanskie432dd22009-01-30 15:04:51 +0000559 if control_file_dir is None:
560 control_file_dir = self.resultdir
jadmanski10646442008-08-13 14:05:21 +0000561
562 self.aborted = False
563 namespace['machines'] = machines
jadmanski808f4b12010-04-09 22:30:31 +0000564 namespace['args'] = self.args
jadmanski10646442008-08-13 14:05:21 +0000565 namespace['job'] = self
mbligh0d0f67d2009-11-06 03:15:03 +0000566 namespace['ssh_user'] = self._ssh_user
567 namespace['ssh_port'] = self._ssh_port
568 namespace['ssh_pass'] = self._ssh_pass
jadmanski10646442008-08-13 14:05:21 +0000569 test_start_time = int(time.time())
570
mbligh80e1eba2008-11-19 00:26:18 +0000571 if self.resultdir:
572 os.chdir(self.resultdir)
jadmanski779bd292009-03-19 17:33:33 +0000573 # touch status.log so that the parser knows a job is running here
jadmanski382303a2009-04-21 19:53:39 +0000574 open(self.get_status_log_path(), 'a').close()
mbligh80e1eba2008-11-19 00:26:18 +0000575 self.enable_external_logging()
jadmanskie432dd22009-01-30 15:04:51 +0000576
jadmanskicdd0c402008-09-19 21:21:31 +0000577 collect_crashinfo = True
mblighaebe3b62008-12-22 14:45:40 +0000578 temp_control_file_dir = None
jadmanski10646442008-08-13 14:05:21 +0000579 try:
showardcf8d4922009-10-14 16:08:39 +0000580 try:
581 if install_before and machines:
582 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanskie432dd22009-01-30 15:04:51 +0000583
showardcf8d4922009-10-14 16:08:39 +0000584 if only_collect_crashinfo:
585 return
586
jadmanskidef0c3c2009-03-25 20:07:10 +0000587 # determine the dir to write the control files to
588 cfd_specified = (control_file_dir
mbligh0d0f67d2009-11-06 03:15:03 +0000589 and control_file_dir is not self._USE_TEMP_DIR)
jadmanskidef0c3c2009-03-25 20:07:10 +0000590 if cfd_specified:
591 temp_control_file_dir = None
592 else:
593 temp_control_file_dir = tempfile.mkdtemp(
594 suffix='temp_control_file_dir')
595 control_file_dir = temp_control_file_dir
596 server_control_file = os.path.join(control_file_dir,
mblighe0cbc912010-03-11 18:03:07 +0000597 self._control_filename)
jadmanskidef0c3c2009-03-25 20:07:10 +0000598 client_control_file = os.path.join(control_file_dir,
599 CLIENT_CONTROL_FILENAME)
mbligh0d0f67d2009-11-06 03:15:03 +0000600 if self._client:
jadmanskidef0c3c2009-03-25 20:07:10 +0000601 namespace['control'] = control
602 utils.open_write_close(client_control_file, control)
mblighfeac0102009-04-28 18:31:12 +0000603 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
604 server_control_file)
jadmanskidef0c3c2009-03-25 20:07:10 +0000605 else:
606 utils.open_write_close(server_control_file, control)
mbligh26f0d882009-06-22 18:30:01 +0000607 logging.info("Processing control file")
jadmanskidef0c3c2009-03-25 20:07:10 +0000608 self._execute_code(server_control_file, namespace)
mbligh26f0d882009-06-22 18:30:01 +0000609 logging.info("Finished processing control file")
jadmanski10646442008-08-13 14:05:21 +0000610
jadmanskidef0c3c2009-03-25 20:07:10 +0000611 # no error occured, so we don't need to collect crashinfo
612 collect_crashinfo = False
Eric Li6f27d4f2010-09-29 10:55:17 -0700613 except Exception, e:
showardcf8d4922009-10-14 16:08:39 +0000614 try:
615 logging.exception(
616 'Exception escaped control file, job aborting:')
Eric Li6f27d4f2010-09-29 10:55:17 -0700617 self.record('INFO', None, None, str(e),
618 {'job_abort_reason': str(e)})
showardcf8d4922009-10-14 16:08:39 +0000619 except:
620 pass # don't let logging exceptions here interfere
621 raise
jadmanski10646442008-08-13 14:05:21 +0000622 finally:
mblighaebe3b62008-12-22 14:45:40 +0000623 if temp_control_file_dir:
jadmanskie432dd22009-01-30 15:04:51 +0000624 # Clean up temp directory used for copies of the control files
mblighaebe3b62008-12-22 14:45:40 +0000625 try:
626 shutil.rmtree(temp_control_file_dir)
627 except Exception, e:
mblighe7d9c602009-07-02 19:02:33 +0000628 logging.warn('Could not remove temp directory %s: %s',
629 temp_control_file_dir, e)
jadmanskie432dd22009-01-30 15:04:51 +0000630
jadmanskicdd0c402008-09-19 21:21:31 +0000631 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000632 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000633 if collect_crashinfo:
mbligh084bc172008-10-18 14:02:45 +0000634 # includes crashdumps
635 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
jadmanskicdd0c402008-09-19 21:21:31 +0000636 else:
mbligh084bc172008-10-18 14:02:45 +0000637 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
jadmanski648c39f2010-03-19 17:38:01 +0000638 if self._uncollected_log_file and created_uncollected_logs:
mbligh0d0f67d2009-11-06 03:15:03 +0000639 os.remove(self._uncollected_log_file)
jadmanski10646442008-08-13 14:05:21 +0000640 self.disable_external_logging()
showard45ae8192008-11-05 19:32:53 +0000641 if cleanup and machines:
642 self._execute_code(CLEANUP_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000643 if install_after and machines:
mbligh084bc172008-10-18 14:02:45 +0000644 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000645
646
647 def run_test(self, url, *args, **dargs):
mbligh2b92b862008-11-22 13:25:32 +0000648 """
649 Summon a test object and run it.
jadmanski10646442008-08-13 14:05:21 +0000650
651 tag
652 tag to add to testname
653 url
654 url of the test to run
655 """
mblighfc3da5b2010-01-06 18:37:22 +0000656 group, testname = self.pkgmgr.get_package_name(url, 'test')
657 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
658 outputdir = self._make_test_outputdir(subdir)
jadmanski10646442008-08-13 14:05:21 +0000659
660 def group_func():
661 try:
662 test.runtest(self, url, tag, args, dargs)
663 except error.TestBaseException, e:
664 self.record(e.exit_status, subdir, testname, str(e))
665 raise
666 except Exception, e:
667 info = str(e) + "\n" + traceback.format_exc()
668 self.record('FAIL', subdir, testname, info)
669 raise
670 else:
mbligh2b92b862008-11-22 13:25:32 +0000671 self.record('GOOD', subdir, testname, 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000672
673 result, exc_info = self._run_group(testname, subdir, group_func)
674 if exc_info and isinstance(exc_info[1], error.TestBaseException):
675 return False
676 elif exc_info:
677 raise exc_info[0], exc_info[1], exc_info[2]
678 else:
679 return True
jadmanski10646442008-08-13 14:05:21 +0000680
681
682 def _run_group(self, name, subdir, function, *args, **dargs):
683 """\
684 Underlying method for running something inside of a group.
685 """
jadmanskide292df2008-08-26 20:51:14 +0000686 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000687 try:
688 self.record('START', subdir, name)
jadmanski52053632010-06-11 21:08:10 +0000689 result = function(*args, **dargs)
jadmanski10646442008-08-13 14:05:21 +0000690 except error.TestBaseException, e:
jadmanskib88d6dc2009-01-10 00:33:18 +0000691 self.record("END %s" % e.exit_status, subdir, name)
jadmanskide292df2008-08-26 20:51:14 +0000692 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000693 except Exception, e:
694 err_msg = str(e) + '\n'
695 err_msg += traceback.format_exc()
696 self.record('END ABORT', subdir, name, err_msg)
697 raise error.JobError(name + ' failed\n' + traceback.format_exc())
698 else:
699 self.record('END GOOD', subdir, name)
700
jadmanskide292df2008-08-26 20:51:14 +0000701 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000702
703
704 def run_group(self, function, *args, **dargs):
705 """\
706 function:
707 subroutine to run
708 *args:
709 arguments for the function
710 """
711
712 name = function.__name__
713
714 # Allow the tag for the group to be specified.
715 tag = dargs.pop('tag', None)
716 if tag:
717 name = tag
718
jadmanskide292df2008-08-26 20:51:14 +0000719 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000720
721
722 def run_reboot(self, reboot_func, get_kernel_func):
723 """\
724 A specialization of run_group meant specifically for handling
725 a reboot. Includes support for capturing the kernel version
726 after the reboot.
727
728 reboot_func: a function that carries out the reboot
729
730 get_kernel_func: a function that returns a string
731 representing the kernel version.
732 """
jadmanski10646442008-08-13 14:05:21 +0000733 try:
734 self.record('START', None, 'reboot')
jadmanski10646442008-08-13 14:05:21 +0000735 reboot_func()
736 except Exception, e:
jadmanski10646442008-08-13 14:05:21 +0000737 err_msg = str(e) + '\n' + traceback.format_exc()
738 self.record('END FAIL', None, 'reboot', err_msg)
jadmanski4b51d542009-04-08 14:17:16 +0000739 raise
jadmanski10646442008-08-13 14:05:21 +0000740 else:
741 kernel = get_kernel_func()
jadmanski10646442008-08-13 14:05:21 +0000742 self.record('END GOOD', None, 'reboot',
743 optional_fields={"kernel": kernel})
744
745
jadmanskie432dd22009-01-30 15:04:51 +0000746 def run_control(self, path):
747 """Execute a control file found at path (relative to the autotest
748 path). Intended for executing a control file within a control file,
749 not for running the top-level job control file."""
750 path = os.path.join(self.autodir, path)
751 control_file = self._load_control_file(path)
mbligh0d0f67d2009-11-06 03:15:03 +0000752 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
jadmanskie432dd22009-01-30 15:04:51 +0000753
754
jadmanskic09fc152008-10-15 17:56:59 +0000755 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
mbligh4395bbd2009-03-25 19:34:17 +0000756 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
jadmanskic09fc152008-10-15 17:56:59 +0000757 on_every_test)
758
759
760 def add_sysinfo_logfile(self, file, on_every_test=False):
761 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
762
763
764 def _add_sysinfo_loggable(self, loggable, on_every_test):
765 if on_every_test:
766 self.sysinfo.test_loggables.add(loggable)
767 else:
768 self.sysinfo.boot_loggables.add(loggable)
769
770
jadmanski10646442008-08-13 14:05:21 +0000771 def _read_warnings(self):
jadmanskif37df842009-02-11 00:03:26 +0000772 """Poll all the warning loggers and extract any new warnings that have
773 been logged. If the warnings belong to a category that is currently
774 disabled, this method will discard them and they will no longer be
775 retrievable.
776
777 Returns a list of (timestamp, message) tuples, where timestamp is an
778 integer epoch timestamp."""
jadmanski10646442008-08-13 14:05:21 +0000779 warnings = []
780 while True:
781 # pull in a line of output from every logger that has
782 # output ready to be read
mbligh2b92b862008-11-22 13:25:32 +0000783 loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
jadmanski10646442008-08-13 14:05:21 +0000784 closed_loggers = set()
785 for logger in loggers:
786 line = logger.readline()
787 # record any broken pipes (aka line == empty)
788 if len(line) == 0:
789 closed_loggers.add(logger)
790 continue
jadmanskif37df842009-02-11 00:03:26 +0000791 # parse out the warning
792 timestamp, msgtype, msg = line.split('\t', 2)
793 timestamp = int(timestamp)
794 # if the warning is valid, add it to the results
795 if self.warning_manager.is_valid(timestamp, msgtype):
796 warnings.append((timestamp, msg.strip()))
jadmanski10646442008-08-13 14:05:21 +0000797
798 # stop listening to loggers that are closed
799 self.warning_loggers -= closed_loggers
800
801 # stop if none of the loggers have any output left
802 if not loggers:
803 break
804
805 # sort into timestamp order
806 warnings.sort()
807 return warnings
808
809
showardcc929362010-01-25 21:20:41 +0000810 def _unique_subdirectory(self, base_subdirectory_name):
811 """Compute a unique results subdirectory based on the given name.
812
813 Appends base_subdirectory_name with a number as necessary to find a
814 directory name that doesn't already exist.
815 """
816 subdirectory = base_subdirectory_name
817 counter = 1
818 while os.path.exists(os.path.join(self.resultdir, subdirectory)):
819 subdirectory = base_subdirectory_name + '.' + str(counter)
820 counter += 1
821 return subdirectory
822
823
jadmanski52053632010-06-11 21:08:10 +0000824 def get_record_context(self):
825 """Returns an object representing the current job.record context.
826
827 The object returned is an opaque object with a 0-arg restore method
828 which can be called to restore the job.record context (i.e. indentation)
829 to the current level. The intention is that it should be used when
830 something external which generate job.record calls (e.g. an autotest
831 client) can fail catastrophically and the server job record state
832 needs to be reset to its original "known good" state.
833
834 @return: A context object with a 0-arg restore() method."""
835 return self._indenter.get_context()
836
837
showardcc929362010-01-25 21:20:41 +0000838 def record_summary(self, status_code, test_name, reason='', attributes=None,
839 distinguishing_attributes=(), child_test_ids=None):
840 """Record a summary test result.
841
842 @param status_code: status code string, see
843 common_lib.log.is_valid_status()
844 @param test_name: name of the test
845 @param reason: (optional) string providing detailed reason for test
846 outcome
847 @param attributes: (optional) dict of string keyvals to associate with
848 this result
849 @param distinguishing_attributes: (optional) list of attribute names
850 that should be used to distinguish identically-named test
851 results. These attributes should be present in the attributes
852 parameter. This is used to generate user-friendly subdirectory
853 names.
854 @param child_test_ids: (optional) list of test indices for test results
855 used in generating this result.
856 """
857 subdirectory_name_parts = [test_name]
858 for attribute in distinguishing_attributes:
859 assert attributes
860 assert attribute in attributes, '%s not in %s' % (attribute,
861 attributes)
862 subdirectory_name_parts.append(attributes[attribute])
863 base_subdirectory_name = '.'.join(subdirectory_name_parts)
864
865 subdirectory = self._unique_subdirectory(base_subdirectory_name)
866 subdirectory_path = os.path.join(self.resultdir, subdirectory)
867 os.mkdir(subdirectory_path)
868
869 self.record(status_code, subdirectory, test_name,
870 status=reason, optional_fields={'is_summary': True})
871
872 if attributes:
873 utils.write_keyval(subdirectory_path, attributes)
874
875 if child_test_ids:
876 ids_string = ','.join(str(test_id) for test_id in child_test_ids)
877 summary_data = {'child_test_ids': ids_string}
878 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
879 summary_data)
880
881
jadmanski16a7ff72009-04-01 18:19:53 +0000882 def disable_warnings(self, warning_type):
jadmanskif37df842009-02-11 00:03:26 +0000883 self.warning_manager.disable_warnings(warning_type)
jadmanski16a7ff72009-04-01 18:19:53 +0000884 self.record("INFO", None, None,
885 "disabling %s warnings" % warning_type,
886 {"warnings.disable": warning_type})
jadmanskif37df842009-02-11 00:03:26 +0000887
888
jadmanski16a7ff72009-04-01 18:19:53 +0000889 def enable_warnings(self, warning_type):
jadmanskif37df842009-02-11 00:03:26 +0000890 self.warning_manager.enable_warnings(warning_type)
jadmanski16a7ff72009-04-01 18:19:53 +0000891 self.record("INFO", None, None,
892 "enabling %s warnings" % warning_type,
893 {"warnings.enable": warning_type})
jadmanskif37df842009-02-11 00:03:26 +0000894
895
jadmanski779bd292009-03-19 17:33:33 +0000896 def get_status_log_path(self, subdir=None):
897 """Return the path to the job status log.
898
899 @param subdir - Optional paramter indicating that you want the path
900 to a subdirectory status log.
901
902 @returns The path where the status log should be.
903 """
mbligh210bae62009-04-01 18:33:13 +0000904 if self.resultdir:
905 if subdir:
906 return os.path.join(self.resultdir, subdir, "status.log")
907 else:
908 return os.path.join(self.resultdir, "status.log")
jadmanski779bd292009-03-19 17:33:33 +0000909 else:
mbligh210bae62009-04-01 18:33:13 +0000910 return None
jadmanski779bd292009-03-19 17:33:33 +0000911
912
jadmanski6bb32d72009-03-19 20:25:24 +0000913 def _update_uncollected_logs_list(self, update_func):
914 """Updates the uncollected logs list in a multi-process safe manner.
915
916 @param update_func - a function that updates the list of uncollected
917 logs. Should take one parameter, the list to be updated.
918 """
mbligh0d0f67d2009-11-06 03:15:03 +0000919 if self._uncollected_log_file:
920 log_file = open(self._uncollected_log_file, "r+")
mbligha788dc42009-03-26 21:10:16 +0000921 fcntl.flock(log_file, fcntl.LOCK_EX)
jadmanski6bb32d72009-03-19 20:25:24 +0000922 try:
923 uncollected_logs = pickle.load(log_file)
924 update_func(uncollected_logs)
925 log_file.seek(0)
926 log_file.truncate()
927 pickle.dump(uncollected_logs, log_file)
jadmanski3bff9092009-04-22 18:09:47 +0000928 log_file.flush()
jadmanski6bb32d72009-03-19 20:25:24 +0000929 finally:
930 fcntl.flock(log_file, fcntl.LOCK_UN)
931 log_file.close()
932
933
934 def add_client_log(self, hostname, remote_path, local_path):
935 """Adds a new set of client logs to the list of uncollected logs,
936 to allow for future log recovery.
937
938 @param host - the hostname of the machine holding the logs
939 @param remote_path - the directory on the remote machine holding logs
940 @param local_path - the local directory to copy the logs into
941 """
942 def update_func(logs_list):
943 logs_list.append((hostname, remote_path, local_path))
944 self._update_uncollected_logs_list(update_func)
945
946
947 def remove_client_log(self, hostname, remote_path, local_path):
948 """Removes a set of client logs from the list of uncollected logs,
949 to allow for future log recovery.
950
951 @param host - the hostname of the machine holding the logs
952 @param remote_path - the directory on the remote machine holding logs
953 @param local_path - the local directory to copy the logs into
954 """
955 def update_func(logs_list):
956 logs_list.remove((hostname, remote_path, local_path))
957 self._update_uncollected_logs_list(update_func)
958
959
mbligh0d0f67d2009-11-06 03:15:03 +0000960 def get_client_logs(self):
961 """Retrieves the list of uncollected logs, if it exists.
962
963 @returns A list of (host, remote_path, local_path) tuples. Returns
964 an empty list if no uncollected logs file exists.
965 """
966 log_exists = (self._uncollected_log_file and
967 os.path.exists(self._uncollected_log_file))
968 if log_exists:
969 return pickle.load(open(self._uncollected_log_file))
970 else:
971 return []
972
973
mbligh084bc172008-10-18 14:02:45 +0000974 def _fill_server_control_namespace(self, namespace, protect=True):
mbligh2b92b862008-11-22 13:25:32 +0000975 """
976 Prepare a namespace to be used when executing server control files.
mbligh084bc172008-10-18 14:02:45 +0000977
978 This sets up the control file API by importing modules and making them
979 available under the appropriate names within namespace.
980
981 For use by _execute_code().
982
983 Args:
984 namespace: The namespace dictionary to fill in.
985 protect: Boolean. If True (the default) any operation that would
986 clobber an existing entry in namespace will cause an error.
987 Raises:
988 error.AutoservError: When a name would be clobbered by import.
989 """
990 def _import_names(module_name, names=()):
mbligh2b92b862008-11-22 13:25:32 +0000991 """
992 Import a module and assign named attributes into namespace.
mbligh084bc172008-10-18 14:02:45 +0000993
994 Args:
995 module_name: The string module name.
996 names: A limiting list of names to import from module_name. If
997 empty (the default), all names are imported from the module
998 similar to a "from foo.bar import *" statement.
999 Raises:
1000 error.AutoservError: When a name being imported would clobber
1001 a name already in namespace.
1002 """
1003 module = __import__(module_name, {}, {}, names)
1004
1005 # No names supplied? Import * from the lowest level module.
1006 # (Ugh, why do I have to implement this part myself?)
1007 if not names:
1008 for submodule_name in module_name.split('.')[1:]:
1009 module = getattr(module, submodule_name)
1010 if hasattr(module, '__all__'):
1011 names = getattr(module, '__all__')
1012 else:
1013 names = dir(module)
1014
1015 # Install each name into namespace, checking to make sure it
1016 # doesn't override anything that already exists.
1017 for name in names:
1018 # Check for conflicts to help prevent future problems.
1019 if name in namespace and protect:
1020 if namespace[name] is not getattr(module, name):
1021 raise error.AutoservError('importing name '
1022 '%s from %s %r would override %r' %
1023 (name, module_name, getattr(module, name),
1024 namespace[name]))
1025 else:
1026 # Encourage cleanliness and the use of __all__ for a
1027 # more concrete API with less surprises on '*' imports.
1028 warnings.warn('%s (%r) being imported from %s for use '
1029 'in server control files is not the '
1030 'first occurrance of that import.' %
1031 (name, namespace[name], module_name))
1032
1033 namespace[name] = getattr(module, name)
1034
1035
1036 # This is the equivalent of prepending a bunch of import statements to
1037 # the front of the control script.
mbligha2b07dd2009-06-22 18:26:13 +00001038 namespace.update(os=os, sys=sys, logging=logging)
mbligh084bc172008-10-18 14:02:45 +00001039 _import_names('autotest_lib.server',
1040 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
1041 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
1042 _import_names('autotest_lib.server.subcommand',
1043 ('parallel', 'parallel_simple', 'subcommand'))
1044 _import_names('autotest_lib.server.utils',
1045 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1046 _import_names('autotest_lib.client.common_lib.error')
1047 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1048
1049 # Inject ourself as the job object into other classes within the API.
1050 # (Yuck, this injection is a gross thing be part of a public API. -gps)
1051 #
1052 # XXX Base & SiteAutotest do not appear to use .job. Who does?
1053 namespace['autotest'].Autotest.job = self
1054 # server.hosts.base_classes.Host uses .job.
1055 namespace['hosts'].Host.job = self
Eric Li10222b82010-11-24 09:33:15 -08001056 namespace['hosts'].factory.ssh_user = self._ssh_user
1057 namespace['hosts'].factory.ssh_port = self._ssh_port
1058 namespace['hosts'].factory.ssh_pass = self._ssh_pass
mbligh084bc172008-10-18 14:02:45 +00001059
1060
1061 def _execute_code(self, code_file, namespace, protect=True):
mbligh2b92b862008-11-22 13:25:32 +00001062 """
1063 Execute code using a copy of namespace as a server control script.
mbligh084bc172008-10-18 14:02:45 +00001064
1065 Unless protect_namespace is explicitly set to False, the dict will not
1066 be modified.
1067
1068 Args:
1069 code_file: The filename of the control file to execute.
1070 namespace: A dict containing names to make available during execution.
1071 protect: Boolean. If True (the default) a copy of the namespace dict
1072 is used during execution to prevent the code from modifying its
1073 contents outside of this function. If False the raw dict is
1074 passed in and modifications will be allowed.
1075 """
1076 if protect:
1077 namespace = namespace.copy()
1078 self._fill_server_control_namespace(namespace, protect=protect)
1079 # TODO: Simplify and get rid of the special cases for only 1 machine.
showard3e66e8c2008-10-27 19:20:51 +00001080 if len(self.machines) > 1:
mbligh084bc172008-10-18 14:02:45 +00001081 machines_text = '\n'.join(self.machines) + '\n'
1082 # Only rewrite the file if it does not match our machine list.
1083 try:
1084 machines_f = open(MACHINES_FILENAME, 'r')
1085 existing_machines_text = machines_f.read()
1086 machines_f.close()
1087 except EnvironmentError:
1088 existing_machines_text = None
1089 if machines_text != existing_machines_text:
1090 utils.open_write_close(MACHINES_FILENAME, machines_text)
1091 execfile(code_file, namespace, namespace)
jadmanski10646442008-08-13 14:05:21 +00001092
1093
jadmanskie29d0e42010-06-17 16:06:52 +00001094 def _parse_status(self, new_line):
mbligh0d0f67d2009-11-06 03:15:03 +00001095 if not self._using_parser:
jadmanski10646442008-08-13 14:05:21 +00001096 return
jadmanskie29d0e42010-06-17 16:06:52 +00001097 new_tests = self.parser.process_lines([new_line])
jadmanski10646442008-08-13 14:05:21 +00001098 for test in new_tests:
1099 self.__insert_test(test)
1100
1101
1102 def __insert_test(self, test):
mbligh2b92b862008-11-22 13:25:32 +00001103 """
1104 An internal method to insert a new test result into the
jadmanski10646442008-08-13 14:05:21 +00001105 database. This method will not raise an exception, even if an
1106 error occurs during the insert, to avoid failing a test
1107 simply because of unexpected database issues."""
showard21baa452008-10-21 00:08:39 +00001108 self.num_tests_run += 1
1109 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1110 self.num_tests_failed += 1
jadmanski10646442008-08-13 14:05:21 +00001111 try:
1112 self.results_db.insert_test(self.job_model, test)
1113 except Exception:
1114 msg = ("WARNING: An unexpected error occured while "
1115 "inserting test results into the database. "
1116 "Ignoring error.\n" + traceback.format_exc())
1117 print >> sys.stderr, msg
1118
mblighcaa62c22008-04-07 21:51:17 +00001119
mblighfc3da5b2010-01-06 18:37:22 +00001120 def preprocess_client_state(self):
1121 """
1122 Produce a state file for initializing the state of a client job.
1123
1124 Creates a new client state file with all the current server state, as
1125 well as some pre-set client state.
1126
1127 @returns The path of the file the state was written into.
1128 """
1129 # initialize the sysinfo state
1130 self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1131
1132 # dump the state out to a tempfile
1133 fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1134 os.close(fd)
mbligha2c99492010-01-27 22:59:50 +00001135
1136 # write_to_file doesn't need locking, we exclusively own file_path
mblighfc3da5b2010-01-06 18:37:22 +00001137 self._state.write_to_file(file_path)
1138 return file_path
1139
1140
1141 def postprocess_client_state(self, state_path):
1142 """
1143 Update the state of this job with the state from a client job.
1144
1145 Updates the state of the server side of a job with the final state
1146 of a client job that was run. Updates the non-client-specific state,
1147 pulls in some specific bits from the client-specific state, and then
1148 discards the rest. Removes the state file afterwards
1149
1150 @param state_file A path to the state file from the client.
1151 """
1152 # update the on-disk state
mblighfc3da5b2010-01-06 18:37:22 +00001153 try:
jadmanskib6e7bdb2010-04-13 16:00:39 +00001154 self._state.read_from_file(state_path)
mblighfc3da5b2010-01-06 18:37:22 +00001155 os.remove(state_path)
mbligha2c99492010-01-27 22:59:50 +00001156 except OSError, e:
mblighfc3da5b2010-01-06 18:37:22 +00001157 # ignore file-not-found errors
1158 if e.errno != errno.ENOENT:
1159 raise
jadmanskib6e7bdb2010-04-13 16:00:39 +00001160 else:
1161 logging.debug('Client state file %s not found', state_path)
mblighfc3da5b2010-01-06 18:37:22 +00001162
1163 # update the sysinfo state
1164 if self._state.has('client', 'sysinfo'):
1165 self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1166
1167 # drop all the client-specific state
1168 self._state.discard_namespace('client')
1169
1170
mbligh0a883702010-04-21 01:58:34 +00001171 def clear_all_known_hosts(self):
1172 """Clears known hosts files for all AbstractSSHHosts."""
1173 for host in self.hosts:
1174 if isinstance(host, abstract_ssh.AbstractSSHHost):
1175 host.clear_known_hosts()
1176
1177
mbligha7007722009-01-13 00:37:11 +00001178site_server_job = utils.import_site_class(
1179 __file__, "autotest_lib.server.site_server_job", "site_server_job",
1180 base_server_job)
jadmanski0afbb632008-06-06 21:10:57 +00001181
mbligh0a8c3322009-04-28 18:32:19 +00001182class server_job(site_server_job):
jadmanski0afbb632008-06-06 21:10:57 +00001183 pass
jadmanskif37df842009-02-11 00:03:26 +00001184
1185
1186class warning_manager(object):
1187 """Class for controlling warning logs. Manages the enabling and disabling
1188 of warnings."""
1189 def __init__(self):
1190 # a map of warning types to a list of disabled time intervals
1191 self.disabled_warnings = {}
1192
1193
1194 def is_valid(self, timestamp, warning_type):
1195 """Indicates if a warning (based on the time it occured and its type)
1196 is a valid warning. A warning is considered "invalid" if this type of
1197 warning was marked as "disabled" at the time the warning occured."""
1198 disabled_intervals = self.disabled_warnings.get(warning_type, [])
1199 for start, end in disabled_intervals:
1200 if timestamp >= start and (end is None or timestamp < end):
1201 return False
1202 return True
1203
1204
1205 def disable_warnings(self, warning_type, current_time_func=time.time):
1206 """As of now, disables all further warnings of this type."""
1207 intervals = self.disabled_warnings.setdefault(warning_type, [])
1208 if not intervals or intervals[-1][1] is not None:
jadmanski16a7ff72009-04-01 18:19:53 +00001209 intervals.append((int(current_time_func()), None))
jadmanskif37df842009-02-11 00:03:26 +00001210
1211
1212 def enable_warnings(self, warning_type, current_time_func=time.time):
1213 """As of now, enables all further warnings of this type."""
1214 intervals = self.disabled_warnings.get(warning_type, [])
1215 if intervals and intervals[-1][1] is None:
jadmanski16a7ff72009-04-01 18:19:53 +00001216 intervals[-1] = (intervals[-1][0], int(current_time_func()))