blob: 6ddc7c79140f4706cc816cb38545b80cf6110970 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
jadmanski025099d2008-09-23 14:13:48 +00009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
jadmanskic09fc152008-10-15 17:56:59 +000010from autotest_lib.client.bin import fd_stack, sysinfo
mbligh09108442008-10-15 16:27:38 +000011from autotest_lib.client.common_lib import error, log, utils, packages
jadmanski10646442008-08-13 14:05:21 +000012from autotest_lib.server import test, subcommand
13from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000014
15
16# load up a control segment
17# these are all stored in <server_dir>/control_segments
18def load_control_segment(name):
19 server_dir = os.path.dirname(os.path.abspath(__file__))
20 script_file = os.path.join(server_dir, "control_segments", name)
21 if os.path.exists(script_file):
22 return file(script_file).read()
23 else:
24 return ""
25
26
27preamble = """\
28import os, sys
29
30from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
31from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
32from autotest_lib.server import git_kernel
33from autotest_lib.server.subcommand import *
34from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
35from autotest_lib.server.utils import parse_machine
36from autotest_lib.client.common_lib.error import *
37from autotest_lib.client.common_lib import barrier
38
39autotest.Autotest.job = job
jadmanski1c5e3a12008-08-15 23:08:20 +000040hosts.Host.job = job
jadmanski10646442008-08-13 14:05:21 +000041barrier = barrier.barrier
42if len(machines) > 1:
43 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
44"""
45
46client_wrapper = """
47at = autotest.Autotest()
48
49def run_client(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000050 hostname, user, passwd, port = parse_machine(
51 machine, ssh_user, ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000052
jadmanski1c5e3a12008-08-15 23:08:20 +000053 host = hosts.create_host(hostname, user=user, port=port, password=passwd)
jadmanski807490c2008-09-15 19:15:02 +000054 host.log_kernel()
jadmanski1c5e3a12008-08-15 23:08:20 +000055 at.run(control, host=host)
jadmanski10646442008-08-13 14:05:21 +000056
57job.parallel_simple(run_client, machines)
58"""
59
60crashdumps = """
61def crashdumps(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000062 hostname, user, passwd, port = parse_machine(machine, ssh_user,
63 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000064
jadmanski1c5e3a12008-08-15 23:08:20 +000065 host = hosts.create_host(hostname, user=user, port=port,
66 initialize=False, password=passwd)
67 host.get_crashdumps(test_start_time)
jadmanski10646442008-08-13 14:05:21 +000068
69job.parallel_simple(crashdumps, machines, log=False)
70"""
71
jadmanskicdd0c402008-09-19 21:21:31 +000072
73crashinfo = """
74def crashinfo(machine):
75 hostname, user, passwd, port = parse_machine(machine, ssh_user,
76 ssh_port, ssh_pass)
77
78 host = hosts.create_host(hostname, user=user, port=port,
79 initialize=False, password=passwd)
80 host.get_crashinfo(test_start_time)
81
82job.parallel_simple(crashinfo, machines, log=False)
83"""
84
85
jadmanski10646442008-08-13 14:05:21 +000086reboot_segment="""\
87def reboot(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000088 hostname, user, passwd, port = parse_machine(machine, ssh_user,
89 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000090
jadmanski8e72aaf2008-08-20 19:22:29 +000091 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000092 initialize=False, password=passwd)
93 host.reboot()
jadmanski10646442008-08-13 14:05:21 +000094
95job.parallel_simple(reboot, machines, log=False)
96"""
97
98install="""\
99def install(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +0000100 hostname, user, passwd, port = parse_machine(machine, ssh_user,
101 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +0000102
jadmanski8e72aaf2008-08-20 19:22:29 +0000103 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +0000104 initialize=False, password=passwd)
105 host.machine_install()
jadmanski10646442008-08-13 14:05:21 +0000106
107job.parallel_simple(install, machines, log=False)
108"""
109
110# load up the verifier control segment, with an optional site-specific hook
111verify = load_control_segment("site_verify")
112verify += load_control_segment("verify")
113
114# load up the repair control segment, with an optional site-specific hook
115repair = load_control_segment("site_repair")
116repair += load_control_segment("repair")
117
118
119# load up site-specific code for generating site-specific job data
120try:
121 import site_job
122 get_site_job_data = site_job.get_site_job_data
123 del site_job
124except ImportError:
125 # by default provide a stub that generates no site data
126 def get_site_job_data(job):
127 return {}
128
129
130class base_server_job(object):
131 """The actual job against which we do everything.
132
133 Properties:
134 autodir
135 The top level autotest directory (/usr/local/autotest).
136 serverdir
137 <autodir>/server/
138 clientdir
139 <autodir>/client/
140 conmuxdir
141 <autodir>/conmux/
142 testdir
143 <autodir>/server/tests/
144 site_testdir
145 <autodir>/server/site_tests/
146 control
147 the control file for this job
148 """
149
150 STATUS_VERSION = 1
151
152
153 def __init__(self, control, args, resultdir, label, user, machines,
154 client=False, parse_job='',
155 ssh_user='root', ssh_port=22, ssh_pass=''):
156 """
157 control
158 The control file (pathname of)
159 args
160 args to pass to the control file
161 resultdir
162 where to throw the results
163 label
164 label for the job
165 user
166 Username for the job (email address)
167 client
168 True if a client-side control file
169 """
170 path = os.path.dirname(__file__)
171 self.autodir = os.path.abspath(os.path.join(path, '..'))
172 self.serverdir = os.path.join(self.autodir, 'server')
173 self.testdir = os.path.join(self.serverdir, 'tests')
174 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
175 self.tmpdir = os.path.join(self.serverdir, 'tmp')
176 self.conmuxdir = os.path.join(self.autodir, 'conmux')
177 self.clientdir = os.path.join(self.autodir, 'client')
178 self.toolsdir = os.path.join(self.autodir, 'client/tools')
179 if control:
180 self.control = open(control, 'r').read()
181 self.control = re.sub('\r', '', self.control)
182 else:
183 self.control = None
184 self.resultdir = resultdir
185 if not os.path.exists(resultdir):
186 os.mkdir(resultdir)
187 self.debugdir = os.path.join(resultdir, 'debug')
188 if not os.path.exists(self.debugdir):
189 os.mkdir(self.debugdir)
190 self.status = os.path.join(resultdir, 'status')
191 self.label = label
192 self.user = user
193 self.args = args
194 self.machines = machines
195 self.client = client
196 self.record_prefix = ''
197 self.warning_loggers = set()
198 self.ssh_user = ssh_user
199 self.ssh_port = ssh_port
200 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000201 self.run_test_cleanup = True
mbligh09108442008-10-15 16:27:38 +0000202 self.last_boot_tag = None
jadmanski10646442008-08-13 14:05:21 +0000203
204 self.stdout = fd_stack.fd_stack(1, sys.stdout)
205 self.stderr = fd_stack.fd_stack(2, sys.stderr)
206
jadmanskic09fc152008-10-15 17:56:59 +0000207 self.sysinfo = sysinfo.sysinfo(self.resultdir)
208
jadmanski025099d2008-09-23 14:13:48 +0000209 if not os.access(self.tmpdir, os.W_OK):
210 try:
211 os.makedirs(self.tmpdir, 0700)
212 except os.error, e:
213 # Thrown if the directory already exists, which it may.
214 pass
215
216 if (not os.access(self.tmpdir, os.W_OK) or
217 not os.path.isdir(self.tmpdir)):
218 self.tmpdir = os.path.join(tempfile.gettempdir(),
219 'autotest-' + getpass.getuser())
220 try:
221 os.makedirs(self.tmpdir, 0700)
222 except os.error, e:
223 # Thrown if the directory already exists, which it may.
224 # If the problem was something other than the
225 # directory already existing, this chmod should throw as well
226 # exception.
227 os.chmod(self.tmpdir, stat.S_IRWXU)
228
jadmanski10646442008-08-13 14:05:21 +0000229 if os.path.exists(self.status):
230 os.unlink(self.status)
231 job_data = {'label' : label, 'user' : user,
232 'hostname' : ','.join(machines),
233 'status_version' : str(self.STATUS_VERSION)}
234 job_data.update(get_site_job_data(self))
235 utils.write_keyval(self.resultdir, job_data)
236
237 self.parse_job = parse_job
238 if self.parse_job and len(machines) == 1:
239 self.using_parser = True
240 self.init_parser(resultdir)
241 else:
242 self.using_parser = False
243 self.pkgmgr = packages.PackageManager(
244 self.autodir, run_function_dargs={'timeout':600})
245 self.pkgdir = os.path.join(self.autodir, 'packages')
246
247
248 def init_parser(self, resultdir):
249 """Start the continuous parsing of resultdir. This sets up
250 the database connection and inserts the basic job object into
251 the database if necessary."""
252 # redirect parser debugging to .parse.log
253 parse_log = os.path.join(resultdir, '.parse.log')
254 parse_log = open(parse_log, 'w', 0)
255 tko_utils.redirect_parser_debugging(parse_log)
256 # create a job model object and set up the db
257 self.results_db = tko_db.db(autocommit=True)
258 self.parser = status_lib.parser(self.STATUS_VERSION)
259 self.job_model = self.parser.make_job(resultdir)
260 self.parser.start(self.job_model)
261 # check if a job already exists in the db and insert it if
262 # it does not
263 job_idx = self.results_db.find_job(self.parse_job)
264 if job_idx is None:
265 self.results_db.insert_job(self.parse_job,
266 self.job_model)
267 else:
268 machine_idx = self.results_db.lookup_machine(
269 self.job_model.machine)
270 self.job_model.index = job_idx
271 self.job_model.machine_idx = machine_idx
272
273
274 def cleanup_parser(self):
275 """This should be called after the server job is finished
276 to carry out any remaining cleanup (e.g. flushing any
277 remaining test results to the results db)"""
278 if not self.using_parser:
279 return
280 final_tests = self.parser.end()
281 for test in final_tests:
282 self.__insert_test(test)
283 self.using_parser = False
284
285
286 def verify(self):
287 if not self.machines:
288 raise error.AutoservError(
289 'No machines specified to verify')
290 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000291 namespace = {'machines' : self.machines, 'job' : self,
292 'ssh_user' : self.ssh_user,
293 'ssh_port' : self.ssh_port,
294 'ssh_pass' : self.ssh_pass}
295 self._execute_code(preamble + verify, namespace)
jadmanski10646442008-08-13 14:05:21 +0000296 except Exception, e:
297 msg = ('Verify failed\n' + str(e) + '\n'
298 + traceback.format_exc())
299 self.record('ABORT', None, None, msg)
300 raise
301
302
303 def repair(self, host_protection):
304 if not self.machines:
305 raise error.AutoservError('No machines specified to repair')
306 namespace = {'machines': self.machines, 'job': self,
307 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
308 'ssh_pass': self.ssh_pass,
309 'protection_level': host_protection}
310 # no matter what happens during repair, go on to try to reverify
311 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000312 self._execute_code(preamble + repair, namespace)
jadmanski10646442008-08-13 14:05:21 +0000313 except Exception, exc:
314 print 'Exception occured during repair'
315 traceback.print_exc()
316 self.verify()
317
318
319 def precheck(self):
320 """
321 perform any additional checks in derived classes.
322 """
323 pass
324
325
326 def enable_external_logging(self):
327 """Start or restart external logging mechanism.
328 """
329 pass
330
331
332 def disable_external_logging(self):
333 """ Pause or stop external logging mechanism.
334 """
335 pass
336
337
jadmanski23afbec2008-09-17 18:12:07 +0000338 def enable_test_cleanup(self):
339 """ By default tests run test.cleanup """
340 self.run_test_cleanup = True
341
342
343 def disable_test_cleanup(self):
344 """ By default tests do not run test.cleanup """
345 self.run_test_cleanup = False
346
347
jadmanski10646442008-08-13 14:05:21 +0000348 def use_external_logging(self):
349 """Return True if external logging should be used.
350 """
351 return False
352
353
354 def parallel_simple(self, function, machines, log=True, timeout=None):
355 """Run 'function' using parallel_simple, with an extra
356 wrapper to handle the necessary setup for continuous parsing,
357 if possible. If continuous parsing is already properly
358 initialized then this should just work."""
359 is_forking = not (len(machines) == 1 and
360 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000361 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000362 def wrapper(machine):
363 self.parse_job += "/" + machine
364 self.using_parser = True
365 self.machines = [machine]
366 self.resultdir = os.path.join(self.resultdir,
367 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000368 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000369 self.init_parser(self.resultdir)
370 result = function(machine)
371 self.cleanup_parser()
372 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000373 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000374 def wrapper(machine):
375 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000376 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000377 result = function(machine)
378 return result
379 else:
380 wrapper = function
381 subcommand.parallel_simple(wrapper, machines, log, timeout)
382
383
384 def run(self, reboot = False, install_before = False,
385 install_after = False, collect_crashdumps = True,
386 namespace = {}):
387 # use a copy so changes don't affect the original dictionary
388 namespace = namespace.copy()
389 machines = self.machines
390
391 self.aborted = False
392 namespace['machines'] = machines
393 namespace['args'] = self.args
394 namespace['job'] = self
395 namespace['ssh_user'] = self.ssh_user
396 namespace['ssh_port'] = self.ssh_port
397 namespace['ssh_pass'] = self.ssh_pass
398 test_start_time = int(time.time())
399
400 os.chdir(self.resultdir)
401
402 self.enable_external_logging()
403 status_log = os.path.join(self.resultdir, 'status.log')
jadmanskicdd0c402008-09-19 21:21:31 +0000404 collect_crashinfo = True
jadmanski10646442008-08-13 14:05:21 +0000405 try:
406 if install_before and machines:
jadmanskicdd0c402008-09-19 21:21:31 +0000407 self._execute_code(preamble + install, namespace)
jadmanski10646442008-08-13 14:05:21 +0000408 if self.client:
409 namespace['control'] = self.control
410 open('control', 'w').write(self.control)
411 open('control.srv', 'w').write(client_wrapper)
412 server_control = client_wrapper
413 else:
414 open('control.srv', 'w').write(self.control)
415 server_control = self.control
jadmanskicdd0c402008-09-19 21:21:31 +0000416 self._execute_code(preamble + server_control, namespace)
jadmanski10646442008-08-13 14:05:21 +0000417
jadmanskicdd0c402008-09-19 21:21:31 +0000418 # disable crashinfo collection if we get this far without error
419 collect_crashinfo = False
jadmanski10646442008-08-13 14:05:21 +0000420 finally:
jadmanskicdd0c402008-09-19 21:21:31 +0000421 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000422 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000423 if collect_crashinfo:
424 script = crashinfo # includes crashdumps
425 else:
426 script = crashdumps
427 self._execute_code(preamble + script, namespace)
jadmanski10646442008-08-13 14:05:21 +0000428 self.disable_external_logging()
429 if reboot and machines:
jadmanskicdd0c402008-09-19 21:21:31 +0000430 self._execute_code(preamble + reboot_segment, namespace)
jadmanski10646442008-08-13 14:05:21 +0000431 if install_after and machines:
jadmanskicdd0c402008-09-19 21:21:31 +0000432 self._execute_code(preamble + install, namespace)
jadmanski10646442008-08-13 14:05:21 +0000433
434
435 def run_test(self, url, *args, **dargs):
436 """Summon a test object and run it.
437
438 tag
439 tag to add to testname
440 url
441 url of the test to run
442 """
443
444 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000445
446 tag = dargs.pop('tag', None)
447 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000448 testname += '.' + tag
449 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000450
451 outputdir = os.path.join(self.resultdir, subdir)
452 if os.path.exists(outputdir):
453 msg = ("%s already exists, test <%s> may have"
454 " already run with tag <%s>"
455 % (outputdir, testname, tag) )
456 raise error.TestError(msg)
457 os.mkdir(outputdir)
458
459 def group_func():
460 try:
461 test.runtest(self, url, tag, args, dargs)
462 except error.TestBaseException, e:
463 self.record(e.exit_status, subdir, testname, str(e))
464 raise
465 except Exception, e:
466 info = str(e) + "\n" + traceback.format_exc()
467 self.record('FAIL', subdir, testname, info)
468 raise
469 else:
470 self.record('GOOD', subdir, testname,
471 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000472
473 result, exc_info = self._run_group(testname, subdir, group_func)
474 if exc_info and isinstance(exc_info[1], error.TestBaseException):
475 return False
476 elif exc_info:
477 raise exc_info[0], exc_info[1], exc_info[2]
478 else:
479 return True
jadmanski10646442008-08-13 14:05:21 +0000480
481
482 def _run_group(self, name, subdir, function, *args, **dargs):
483 """\
484 Underlying method for running something inside of a group.
485 """
jadmanskide292df2008-08-26 20:51:14 +0000486 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000487 old_record_prefix = self.record_prefix
488 try:
489 self.record('START', subdir, name)
490 self.record_prefix += '\t'
491 try:
492 result = function(*args, **dargs)
493 finally:
494 self.record_prefix = old_record_prefix
495 except error.TestBaseException, e:
496 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000497 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000498 except Exception, e:
499 err_msg = str(e) + '\n'
500 err_msg += traceback.format_exc()
501 self.record('END ABORT', subdir, name, err_msg)
502 raise error.JobError(name + ' failed\n' + traceback.format_exc())
503 else:
504 self.record('END GOOD', subdir, name)
505
jadmanskide292df2008-08-26 20:51:14 +0000506 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000507
508
509 def run_group(self, function, *args, **dargs):
510 """\
511 function:
512 subroutine to run
513 *args:
514 arguments for the function
515 """
516
517 name = function.__name__
518
519 # Allow the tag for the group to be specified.
520 tag = dargs.pop('tag', None)
521 if tag:
522 name = tag
523
jadmanskide292df2008-08-26 20:51:14 +0000524 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000525
526
527 def run_reboot(self, reboot_func, get_kernel_func):
528 """\
529 A specialization of run_group meant specifically for handling
530 a reboot. Includes support for capturing the kernel version
531 after the reboot.
532
533 reboot_func: a function that carries out the reboot
534
535 get_kernel_func: a function that returns a string
536 representing the kernel version.
537 """
538
539 old_record_prefix = self.record_prefix
540 try:
541 self.record('START', None, 'reboot')
542 self.record_prefix += '\t'
543 reboot_func()
544 except Exception, e:
545 self.record_prefix = old_record_prefix
546 err_msg = str(e) + '\n' + traceback.format_exc()
547 self.record('END FAIL', None, 'reboot', err_msg)
548 else:
549 kernel = get_kernel_func()
550 self.record_prefix = old_record_prefix
551 self.record('END GOOD', None, 'reboot',
552 optional_fields={"kernel": kernel})
553
554
jadmanskic09fc152008-10-15 17:56:59 +0000555 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
556 self._add_sysinfo_loggable(sysinfo.command(command, logfile),
557 on_every_test)
558
559
560 def add_sysinfo_logfile(self, file, on_every_test=False):
561 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
562
563
564 def _add_sysinfo_loggable(self, loggable, on_every_test):
565 if on_every_test:
566 self.sysinfo.test_loggables.add(loggable)
567 else:
568 self.sysinfo.boot_loggables.add(loggable)
569
570
jadmanski10646442008-08-13 14:05:21 +0000571 def record(self, status_code, subdir, operation, status='',
572 optional_fields=None):
573 """
574 Record job-level status
575
576 The intent is to make this file both machine parseable and
577 human readable. That involves a little more complexity, but
578 really isn't all that bad ;-)
579
580 Format is <status code>\t<subdir>\t<operation>\t<status>
581
mbligh1b3b3762008-09-25 02:46:34 +0000582 status code: see common_lib.log.is_valid_status()
jadmanski10646442008-08-13 14:05:21 +0000583 for valid status definition
584
585 subdir: MUST be a relevant subdirectory in the results,
586 or None, which will be represented as '----'
587
588 operation: description of what you ran (e.g. "dbench", or
589 "mkfs -t foobar /dev/sda9")
590
591 status: error message or "completed sucessfully"
592
593 ------------------------------------------------------------
594
595 Initial tabs indicate indent levels for grouping, and is
596 governed by self.record_prefix
597
598 multiline messages have secondary lines prefaced by a double
599 space (' ')
600
601 Executing this method will trigger the logging of all new
602 warnings to date from the various console loggers.
603 """
604 # poll all our warning loggers for new warnings
605 warnings = self._read_warnings()
606 for timestamp, msg in warnings:
607 self._record("WARN", None, None, msg, timestamp)
608
609 # write out the actual status log line
610 self._record(status_code, subdir, operation, status,
611 optional_fields=optional_fields)
612
613
614 def _read_warnings(self):
615 warnings = []
616 while True:
617 # pull in a line of output from every logger that has
618 # output ready to be read
619 loggers, _, _ = select.select(self.warning_loggers,
620 [], [], 0)
621 closed_loggers = set()
622 for logger in loggers:
623 line = logger.readline()
624 # record any broken pipes (aka line == empty)
625 if len(line) == 0:
626 closed_loggers.add(logger)
627 continue
628 timestamp, msg = line.split('\t', 1)
629 warnings.append((int(timestamp), msg.strip()))
630
631 # stop listening to loggers that are closed
632 self.warning_loggers -= closed_loggers
633
634 # stop if none of the loggers have any output left
635 if not loggers:
636 break
637
638 # sort into timestamp order
639 warnings.sort()
640 return warnings
641
642
643 def _render_record(self, status_code, subdir, operation, status='',
644 epoch_time=None, record_prefix=None,
645 optional_fields=None):
646 """
647 Internal Function to generate a record to be written into a
648 status log. For use by server_job.* classes only.
649 """
650 if subdir:
651 if re.match(r'[\n\t]', subdir):
652 raise ValueError(
653 'Invalid character in subdir string')
654 substr = subdir
655 else:
656 substr = '----'
657
mbligh1b3b3762008-09-25 02:46:34 +0000658 if not log.is_valid_status(status_code):
jadmanski10646442008-08-13 14:05:21 +0000659 raise ValueError('Invalid status code supplied: %s' %
660 status_code)
661 if not operation:
662 operation = '----'
663 if re.match(r'[\n\t]', operation):
664 raise ValueError(
665 'Invalid character in operation string')
666 operation = operation.rstrip()
667 status = status.rstrip()
668 status = re.sub(r"\t", " ", status)
669 # Ensure any continuation lines are marked so we can
670 # detect them in the status file to ensure it is parsable.
671 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
672
673 if not optional_fields:
674 optional_fields = {}
675
676 # Generate timestamps for inclusion in the logs
677 if epoch_time is None:
678 epoch_time = int(time.time())
679 local_time = time.localtime(epoch_time)
680 optional_fields["timestamp"] = str(epoch_time)
681 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
682 local_time)
683
684 fields = [status_code, substr, operation]
685 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
686 fields.append(status)
687
688 if record_prefix is None:
689 record_prefix = self.record_prefix
690
691 msg = '\t'.join(str(x) for x in fields)
692
693 return record_prefix + msg + '\n'
694
695
696 def _record_prerendered(self, msg):
697 """
698 Record a pre-rendered msg into the status logs. The only
699 change this makes to the message is to add on the local
700 indentation. Should not be called outside of server_job.*
701 classes. Unlike _record, this does not write the message
702 to standard output.
703 """
704 lines = []
705 status_file = os.path.join(self.resultdir, 'status.log')
706 status_log = open(status_file, 'a')
707 for line in msg.splitlines():
708 line = self.record_prefix + line + '\n'
709 lines.append(line)
710 status_log.write(line)
711 status_log.close()
712 self.__parse_status(lines)
713
714
jadmanskicdd0c402008-09-19 21:21:31 +0000715 def _execute_code(self, code, scope):
716 exec(code, scope, scope)
jadmanski10646442008-08-13 14:05:21 +0000717
718
719 def _record(self, status_code, subdir, operation, status='',
720 epoch_time=None, optional_fields=None):
721 """
722 Actual function for recording a single line into the status
723 logs. Should never be called directly, only by job.record as
724 this would bypass the console monitor logging.
725 """
726
727 msg = self._render_record(status_code, subdir, operation,
728 status, epoch_time,
729 optional_fields=optional_fields)
730
731
732 status_file = os.path.join(self.resultdir, 'status.log')
733 sys.stdout.write(msg)
734 open(status_file, "a").write(msg)
735 if subdir:
736 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000737 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000738 open(status_file, "a").write(msg)
739 self.__parse_status(msg.splitlines())
740
741
742 def __parse_status(self, new_lines):
743 if not self.using_parser:
744 return
745 new_tests = self.parser.process_lines(new_lines)
746 for test in new_tests:
747 self.__insert_test(test)
748
749
750 def __insert_test(self, test):
751 """ An internal method to insert a new test result into the
752 database. This method will not raise an exception, even if an
753 error occurs during the insert, to avoid failing a test
754 simply because of unexpected database issues."""
755 try:
756 self.results_db.insert_test(self.job_model, test)
757 except Exception:
758 msg = ("WARNING: An unexpected error occured while "
759 "inserting test results into the database. "
760 "Ignoring error.\n" + traceback.format_exc())
761 print >> sys.stderr, msg
762
mblighcaa62c22008-04-07 21:51:17 +0000763
jadmanskib6eb2f12008-09-12 16:39:36 +0000764
jadmanskia1f3c202008-09-15 19:17:16 +0000765class log_collector(object):
766 def __init__(self, host, client_tag, results_dir):
767 self.host = host
768 if not client_tag:
769 client_tag = "default"
770 self.client_results_dir = os.path.join(host.get_autodir(), "results",
771 client_tag)
772 self.server_results_dir = results_dir
773
774
jadmanskib6eb2f12008-09-12 16:39:36 +0000775 def collect_client_job_results(self):
776 """ A method that collects all the current results of a running
777 client job into the results dir. By default does nothing as no
778 client job is running, but when running a client job you can override
779 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000780
781 # make an effort to wait for the machine to come up
782 try:
783 self.host.wait_up(timeout=30)
784 except error.AutoservError:
785 # don't worry about any errors, we'll try and
786 # get the results anyway
787 pass
788
789
790 # Copy all dirs in default to results_dir
jadmanskibfb32f82008-10-03 23:13:36 +0000791 try:
792 keyval_path = self._prepare_for_copying_logs()
793 self.host.get_file(self.client_results_dir + '/',
794 self.server_results_dir)
795 self._process_copied_logs(keyval_path)
796 self._postprocess_copied_logs()
797 except Exception:
798 # well, don't stop running just because we couldn't get logs
799 print "Unexpected error copying test result logs, continuing ..."
800 traceback.print_exc(file=sys.stdout)
jadmanskia1f3c202008-09-15 19:17:16 +0000801
802
803 def _prepare_for_copying_logs(self):
804 server_keyval = os.path.join(self.server_results_dir, 'keyval')
805 if not os.path.exists(server_keyval):
806 # Client-side keyval file can be copied directly
807 return
808
809 # Copy client-side keyval to temporary location
810 suffix = '.keyval_%s' % self.host.hostname
811 fd, keyval_path = tempfile.mkstemp(suffix)
812 os.close(fd)
813 try:
814 client_keyval = os.path.join(self.client_results_dir, 'keyval')
815 try:
816 self.host.get_file(client_keyval, keyval_path)
817 finally:
818 # We will squirrel away the client side keyval
819 # away and move it back when we are done
820 remote_temp_dir = self.host.get_tmp_dir()
821 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
822 self.host.run('mv %s %s' % (client_keyval,
823 self.temp_keyval_path))
824 except (error.AutoservRunError, error.AutoservSSHTimeout):
825 print "Prepare for copying logs failed"
826 return keyval_path
827
828
829 def _process_copied_logs(self, keyval_path):
830 if not keyval_path:
831 # Client-side keyval file was copied directly
832 return
833
834 # Append contents of keyval_<host> file to keyval file
835 try:
836 # Read in new and old keyval files
837 new_keyval = utils.read_keyval(keyval_path)
838 old_keyval = utils.read_keyval(self.server_results_dir)
839 # 'Delete' from new keyval entries that are in both
840 tmp_keyval = {}
841 for key, val in new_keyval.iteritems():
842 if key not in old_keyval:
843 tmp_keyval[key] = val
844 # Append new info to keyval file
845 utils.write_keyval(self.server_results_dir, tmp_keyval)
846 # Delete keyval_<host> file
847 os.remove(keyval_path)
848 except IOError:
849 print "Process copied logs failed"
850
851
852 def _postprocess_copied_logs(self):
853 # we can now put our keyval file back
854 client_keyval = os.path.join(self.client_results_dir, 'keyval')
855 try:
856 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
857 except Exception:
858 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000859
860
mbligh57e78662008-06-17 19:53:49 +0000861# a file-like object for catching stderr from an autotest client and
862# extracting status logs from it
863class client_logger(object):
864 """Partial file object to write to both stdout and
865 the status log file. We only implement those methods
866 utils.run() actually calls.
867 """
jadmanskia1f3c202008-09-15 19:17:16 +0000868 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
869 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000870 extract_indent = re.compile(r"^(\t*).*$")
871
jadmanskia1f3c202008-09-15 19:17:16 +0000872 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000873 self.host = host
874 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000875 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000876 self.leftover = ""
877 self.last_line = ""
878 self.logs = {}
879
880
881 def _process_log_dict(self, log_dict):
882 log_list = log_dict.pop("logs", [])
883 for key in sorted(log_dict.iterkeys()):
884 log_list += self._process_log_dict(log_dict.pop(key))
885 return log_list
886
887
888 def _process_logs(self):
889 """Go through the accumulated logs in self.log and print them
890 out to stdout and the status log. Note that this processes
891 logs in an ordering where:
892
893 1) logs to different tags are never interleaved
894 2) logs to x.y come before logs to x.y.z for all z
895 3) logs to x.y come before x.z whenever y < z
896
897 Note that this will in general not be the same as the
898 chronological ordering of the logs. However, if a chronological
899 ordering is desired that one can be reconstructed from the
900 status log by looking at timestamp lines."""
901 log_list = self._process_log_dict(self.logs)
902 for line in log_list:
903 self.job._record_prerendered(line + '\n')
904 if log_list:
905 self.last_line = log_list[-1]
906
907
908 def _process_quoted_line(self, tag, line):
909 """Process a line quoted with an AUTOTEST_STATUS flag. If the
910 tag is blank then we want to push out all the data we've been
911 building up in self.logs, and then the newest line. If the
912 tag is not blank, then push the line into the logs for handling
913 later."""
914 print line
915 if tag == "":
916 self._process_logs()
917 self.job._record_prerendered(line + '\n')
918 self.last_line = line
919 else:
920 tag_parts = [int(x) for x in tag.split(".")]
921 log_dict = self.logs
922 for part in tag_parts:
923 log_dict = log_dict.setdefault(part, {})
924 log_list = log_dict.setdefault("logs", [])
925 log_list.append(line)
926
927
928 def _process_line(self, line):
929 """Write out a line of data to the appropriate stream. Status
930 lines sent by autotest will be prepended with
931 "AUTOTEST_STATUS", and all other lines are ssh error
932 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000933 status_match = self.status_parser.search(line)
934 test_complete_match = self.test_complete_parser.search(line)
935 if status_match:
936 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000937 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000938 elif test_complete_match:
939 fifo_path, = test_complete_match.groups()
940 self.log_collector.collect_client_job_results()
941 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000942 else:
943 print line
944
jadmanski4aeefe12008-06-20 20:04:25 +0000945
946 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000947 # use the indentation of whatever the last log line was
948 indent = self.extract_indent.match(last_line).group(1)
949 # if the last line starts a new group, add an extra indent
950 if last_line.lstrip('\t').startswith("START\t"):
951 indent += '\t'
952 return [self.job._render_record("WARN", None, None, msg,
953 timestamp, indent).rstrip('\n')
954 for timestamp, msg in warnings]
955
956
957 def _process_warnings(self, last_line, log_dict, warnings):
958 if log_dict.keys() in ([], ["logs"]):
959 # there are no sub-jobs, just append the warnings here
960 warnings = self._format_warnings(last_line, warnings)
961 log_list = log_dict.setdefault("logs", [])
962 log_list += warnings
963 for warning in warnings:
964 sys.stdout.write(warning + '\n')
965 else:
966 # there are sub-jobs, so put the warnings in there
967 log_list = log_dict.get("logs", [])
968 if log_list:
969 last_line = log_list[-1]
970 for key in sorted(log_dict.iterkeys()):
971 if key != "logs":
972 self._process_warnings(last_line,
973 log_dict[key],
974 warnings)
975
976
977 def write(self, data):
978 # first check for any new console warnings
979 warnings = self.job._read_warnings()
980 self._process_warnings(self.last_line, self.logs, warnings)
981 # now process the newest data written out
982 data = self.leftover + data
983 lines = data.split("\n")
984 # process every line but the last one
985 for line in lines[:-1]:
986 self._process_line(line)
987 # save the last line for later processing
988 # since we may not have the whole line yet
989 self.leftover = lines[-1]
990
991
992 def flush(self):
993 sys.stdout.flush()
994
995
996 def close(self):
997 if self.leftover:
998 self._process_line(self.leftover)
999 self._process_logs()
1000 self.flush()
1001
1002
mblighcaa62c22008-04-07 21:51:17 +00001003# site_server_job.py may be non-existant or empty, make sure that an
1004# appropriate site_server_job class is created nevertheless
1005try:
jadmanski0afbb632008-06-06 21:10:57 +00001006 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +00001007except ImportError:
jadmanski10646442008-08-13 14:05:21 +00001008 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +00001009 pass
1010
jadmanski10646442008-08-13 14:05:21 +00001011class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +00001012 pass