blob: 2d32d8b9e295d6e28e4e07faca58cee4c5933876 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
jadmanski10646442008-08-13 14:05:21 +000014import os, sys, re, time, select, subprocess, traceback
15
16from autotest_lib.client.bin import fd_stack
17from autotest_lib.client.common_lib import error, logging
18from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.client.common_lib import utils, packages
21
22
23# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
27 script_file = os.path.join(server_dir, "control_segments", name)
28 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
34preamble = """\
35import os, sys
36
37from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
42from autotest_lib.server.utils import parse_machine
43from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
45
46autotest.Autotest.job = job
jadmanski1c5e3a12008-08-15 23:08:20 +000047hosts.Host.job = job
jadmanski10646442008-08-13 14:05:21 +000048barrier = barrier.barrier
49if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
51"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000057 hostname, user, passwd, port = parse_machine(
58 machine, ssh_user, ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000059
jadmanski1c5e3a12008-08-15 23:08:20 +000060 host = hosts.create_host(hostname, user=user, port=port, password=passwd)
jadmanski807490c2008-09-15 19:15:02 +000061 host.log_kernel()
jadmanski1c5e3a12008-08-15 23:08:20 +000062 at.run(control, host=host)
jadmanski10646442008-08-13 14:05:21 +000063
64job.parallel_simple(run_client, machines)
65"""
66
67crashdumps = """
68def crashdumps(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000069 hostname, user, passwd, port = parse_machine(machine, ssh_user,
70 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000071
jadmanski1c5e3a12008-08-15 23:08:20 +000072 host = hosts.create_host(hostname, user=user, port=port,
73 initialize=False, password=passwd)
74 host.get_crashdumps(test_start_time)
jadmanski10646442008-08-13 14:05:21 +000075
76job.parallel_simple(crashdumps, machines, log=False)
77"""
78
jadmanskicdd0c402008-09-19 21:21:31 +000079
80crashinfo = """
81def crashinfo(machine):
82 hostname, user, passwd, port = parse_machine(machine, ssh_user,
83 ssh_port, ssh_pass)
84
85 host = hosts.create_host(hostname, user=user, port=port,
86 initialize=False, password=passwd)
87 host.get_crashinfo(test_start_time)
88
89job.parallel_simple(crashinfo, machines, log=False)
90"""
91
92
jadmanski10646442008-08-13 14:05:21 +000093reboot_segment="""\
94def reboot(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000095 hostname, user, passwd, port = parse_machine(machine, ssh_user,
96 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000097
jadmanski8e72aaf2008-08-20 19:22:29 +000098 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000099 initialize=False, password=passwd)
100 host.reboot()
jadmanski10646442008-08-13 14:05:21 +0000101
102job.parallel_simple(reboot, machines, log=False)
103"""
104
105install="""\
106def install(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +0000107 hostname, user, passwd, port = parse_machine(machine, ssh_user,
108 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +0000109
jadmanski8e72aaf2008-08-20 19:22:29 +0000110 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +0000111 initialize=False, password=passwd)
112 host.machine_install()
jadmanski10646442008-08-13 14:05:21 +0000113
114job.parallel_simple(install, machines, log=False)
115"""
116
117# load up the verifier control segment, with an optional site-specific hook
118verify = load_control_segment("site_verify")
119verify += load_control_segment("verify")
120
121# load up the repair control segment, with an optional site-specific hook
122repair = load_control_segment("site_repair")
123repair += load_control_segment("repair")
124
125
126# load up site-specific code for generating site-specific job data
127try:
128 import site_job
129 get_site_job_data = site_job.get_site_job_data
130 del site_job
131except ImportError:
132 # by default provide a stub that generates no site data
133 def get_site_job_data(job):
134 return {}
135
136
137class base_server_job(object):
138 """The actual job against which we do everything.
139
140 Properties:
141 autodir
142 The top level autotest directory (/usr/local/autotest).
143 serverdir
144 <autodir>/server/
145 clientdir
146 <autodir>/client/
147 conmuxdir
148 <autodir>/conmux/
149 testdir
150 <autodir>/server/tests/
151 site_testdir
152 <autodir>/server/site_tests/
153 control
154 the control file for this job
155 """
156
157 STATUS_VERSION = 1
158
159
160 def __init__(self, control, args, resultdir, label, user, machines,
161 client=False, parse_job='',
162 ssh_user='root', ssh_port=22, ssh_pass=''):
163 """
164 control
165 The control file (pathname of)
166 args
167 args to pass to the control file
168 resultdir
169 where to throw the results
170 label
171 label for the job
172 user
173 Username for the job (email address)
174 client
175 True if a client-side control file
176 """
177 path = os.path.dirname(__file__)
178 self.autodir = os.path.abspath(os.path.join(path, '..'))
179 self.serverdir = os.path.join(self.autodir, 'server')
180 self.testdir = os.path.join(self.serverdir, 'tests')
181 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
182 self.tmpdir = os.path.join(self.serverdir, 'tmp')
183 self.conmuxdir = os.path.join(self.autodir, 'conmux')
184 self.clientdir = os.path.join(self.autodir, 'client')
185 self.toolsdir = os.path.join(self.autodir, 'client/tools')
186 if control:
187 self.control = open(control, 'r').read()
188 self.control = re.sub('\r', '', self.control)
189 else:
190 self.control = None
191 self.resultdir = resultdir
192 if not os.path.exists(resultdir):
193 os.mkdir(resultdir)
194 self.debugdir = os.path.join(resultdir, 'debug')
195 if not os.path.exists(self.debugdir):
196 os.mkdir(self.debugdir)
197 self.status = os.path.join(resultdir, 'status')
198 self.label = label
199 self.user = user
200 self.args = args
201 self.machines = machines
202 self.client = client
203 self.record_prefix = ''
204 self.warning_loggers = set()
205 self.ssh_user = ssh_user
206 self.ssh_port = ssh_port
207 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000208 self.run_test_cleanup = True
jadmanski10646442008-08-13 14:05:21 +0000209
210 self.stdout = fd_stack.fd_stack(1, sys.stdout)
211 self.stderr = fd_stack.fd_stack(2, sys.stderr)
212
213 if os.path.exists(self.status):
214 os.unlink(self.status)
215 job_data = {'label' : label, 'user' : user,
216 'hostname' : ','.join(machines),
217 'status_version' : str(self.STATUS_VERSION)}
218 job_data.update(get_site_job_data(self))
219 utils.write_keyval(self.resultdir, job_data)
220
221 self.parse_job = parse_job
222 if self.parse_job and len(machines) == 1:
223 self.using_parser = True
224 self.init_parser(resultdir)
225 else:
226 self.using_parser = False
227 self.pkgmgr = packages.PackageManager(
228 self.autodir, run_function_dargs={'timeout':600})
229 self.pkgdir = os.path.join(self.autodir, 'packages')
230
231
232 def init_parser(self, resultdir):
233 """Start the continuous parsing of resultdir. This sets up
234 the database connection and inserts the basic job object into
235 the database if necessary."""
236 # redirect parser debugging to .parse.log
237 parse_log = os.path.join(resultdir, '.parse.log')
238 parse_log = open(parse_log, 'w', 0)
239 tko_utils.redirect_parser_debugging(parse_log)
240 # create a job model object and set up the db
241 self.results_db = tko_db.db(autocommit=True)
242 self.parser = status_lib.parser(self.STATUS_VERSION)
243 self.job_model = self.parser.make_job(resultdir)
244 self.parser.start(self.job_model)
245 # check if a job already exists in the db and insert it if
246 # it does not
247 job_idx = self.results_db.find_job(self.parse_job)
248 if job_idx is None:
249 self.results_db.insert_job(self.parse_job,
250 self.job_model)
251 else:
252 machine_idx = self.results_db.lookup_machine(
253 self.job_model.machine)
254 self.job_model.index = job_idx
255 self.job_model.machine_idx = machine_idx
256
257
258 def cleanup_parser(self):
259 """This should be called after the server job is finished
260 to carry out any remaining cleanup (e.g. flushing any
261 remaining test results to the results db)"""
262 if not self.using_parser:
263 return
264 final_tests = self.parser.end()
265 for test in final_tests:
266 self.__insert_test(test)
267 self.using_parser = False
268
269
270 def verify(self):
271 if not self.machines:
272 raise error.AutoservError(
273 'No machines specified to verify')
274 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000275 namespace = {'machines' : self.machines, 'job' : self,
276 'ssh_user' : self.ssh_user,
277 'ssh_port' : self.ssh_port,
278 'ssh_pass' : self.ssh_pass}
279 self._execute_code(preamble + verify, namespace)
jadmanski10646442008-08-13 14:05:21 +0000280 except Exception, e:
281 msg = ('Verify failed\n' + str(e) + '\n'
282 + traceback.format_exc())
283 self.record('ABORT', None, None, msg)
284 raise
285
286
287 def repair(self, host_protection):
288 if not self.machines:
289 raise error.AutoservError('No machines specified to repair')
290 namespace = {'machines': self.machines, 'job': self,
291 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
292 'ssh_pass': self.ssh_pass,
293 'protection_level': host_protection}
294 # no matter what happens during repair, go on to try to reverify
295 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000296 self._execute_code(preamble + repair, namespace)
jadmanski10646442008-08-13 14:05:21 +0000297 except Exception, exc:
298 print 'Exception occured during repair'
299 traceback.print_exc()
300 self.verify()
301
302
303 def precheck(self):
304 """
305 perform any additional checks in derived classes.
306 """
307 pass
308
309
310 def enable_external_logging(self):
311 """Start or restart external logging mechanism.
312 """
313 pass
314
315
316 def disable_external_logging(self):
317 """ Pause or stop external logging mechanism.
318 """
319 pass
320
321
jadmanski23afbec2008-09-17 18:12:07 +0000322 def enable_test_cleanup(self):
323 """ By default tests run test.cleanup """
324 self.run_test_cleanup = True
325
326
327 def disable_test_cleanup(self):
328 """ By default tests do not run test.cleanup """
329 self.run_test_cleanup = False
330
331
jadmanski10646442008-08-13 14:05:21 +0000332 def use_external_logging(self):
333 """Return True if external logging should be used.
334 """
335 return False
336
337
338 def parallel_simple(self, function, machines, log=True, timeout=None):
339 """Run 'function' using parallel_simple, with an extra
340 wrapper to handle the necessary setup for continuous parsing,
341 if possible. If continuous parsing is already properly
342 initialized then this should just work."""
343 is_forking = not (len(machines) == 1 and
344 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000345 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000346 def wrapper(machine):
347 self.parse_job += "/" + machine
348 self.using_parser = True
349 self.machines = [machine]
350 self.resultdir = os.path.join(self.resultdir,
351 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000352 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000353 self.init_parser(self.resultdir)
354 result = function(machine)
355 self.cleanup_parser()
356 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000357 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000358 def wrapper(machine):
359 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000360 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000361 result = function(machine)
362 return result
363 else:
364 wrapper = function
365 subcommand.parallel_simple(wrapper, machines, log, timeout)
366
367
368 def run(self, reboot = False, install_before = False,
369 install_after = False, collect_crashdumps = True,
370 namespace = {}):
371 # use a copy so changes don't affect the original dictionary
372 namespace = namespace.copy()
373 machines = self.machines
374
375 self.aborted = False
376 namespace['machines'] = machines
377 namespace['args'] = self.args
378 namespace['job'] = self
379 namespace['ssh_user'] = self.ssh_user
380 namespace['ssh_port'] = self.ssh_port
381 namespace['ssh_pass'] = self.ssh_pass
382 test_start_time = int(time.time())
383
384 os.chdir(self.resultdir)
385
386 self.enable_external_logging()
387 status_log = os.path.join(self.resultdir, 'status.log')
jadmanskicdd0c402008-09-19 21:21:31 +0000388 collect_crashinfo = True
jadmanski10646442008-08-13 14:05:21 +0000389 try:
390 if install_before and machines:
jadmanskicdd0c402008-09-19 21:21:31 +0000391 self._execute_code(preamble + install, namespace)
jadmanski10646442008-08-13 14:05:21 +0000392 if self.client:
393 namespace['control'] = self.control
394 open('control', 'w').write(self.control)
395 open('control.srv', 'w').write(client_wrapper)
396 server_control = client_wrapper
397 else:
398 open('control.srv', 'w').write(self.control)
399 server_control = self.control
jadmanskicdd0c402008-09-19 21:21:31 +0000400 self._execute_code(preamble + server_control, namespace)
jadmanski10646442008-08-13 14:05:21 +0000401
jadmanskicdd0c402008-09-19 21:21:31 +0000402 # disable crashinfo collection if we get this far without error
403 collect_crashinfo = False
jadmanski10646442008-08-13 14:05:21 +0000404 finally:
jadmanskicdd0c402008-09-19 21:21:31 +0000405 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000406 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000407 if collect_crashinfo:
408 script = crashinfo # includes crashdumps
409 else:
410 script = crashdumps
411 self._execute_code(preamble + script, namespace)
jadmanski10646442008-08-13 14:05:21 +0000412 self.disable_external_logging()
413 if reboot and machines:
jadmanskicdd0c402008-09-19 21:21:31 +0000414 self._execute_code(preamble + reboot_segment, namespace)
jadmanski10646442008-08-13 14:05:21 +0000415 if install_after and machines:
jadmanskicdd0c402008-09-19 21:21:31 +0000416 self._execute_code(preamble + install, namespace)
jadmanski10646442008-08-13 14:05:21 +0000417
418
419 def run_test(self, url, *args, **dargs):
420 """Summon a test object and run it.
421
422 tag
423 tag to add to testname
424 url
425 url of the test to run
426 """
427
428 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000429
430 tag = dargs.pop('tag', None)
431 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000432 testname += '.' + tag
433 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000434
435 outputdir = os.path.join(self.resultdir, subdir)
436 if os.path.exists(outputdir):
437 msg = ("%s already exists, test <%s> may have"
438 " already run with tag <%s>"
439 % (outputdir, testname, tag) )
440 raise error.TestError(msg)
441 os.mkdir(outputdir)
442
443 def group_func():
444 try:
445 test.runtest(self, url, tag, args, dargs)
446 except error.TestBaseException, e:
447 self.record(e.exit_status, subdir, testname, str(e))
448 raise
449 except Exception, e:
450 info = str(e) + "\n" + traceback.format_exc()
451 self.record('FAIL', subdir, testname, info)
452 raise
453 else:
454 self.record('GOOD', subdir, testname,
455 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000456
457 result, exc_info = self._run_group(testname, subdir, group_func)
458 if exc_info and isinstance(exc_info[1], error.TestBaseException):
459 return False
460 elif exc_info:
461 raise exc_info[0], exc_info[1], exc_info[2]
462 else:
463 return True
jadmanski10646442008-08-13 14:05:21 +0000464
465
466 def _run_group(self, name, subdir, function, *args, **dargs):
467 """\
468 Underlying method for running something inside of a group.
469 """
jadmanskide292df2008-08-26 20:51:14 +0000470 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000471 old_record_prefix = self.record_prefix
472 try:
473 self.record('START', subdir, name)
474 self.record_prefix += '\t'
475 try:
476 result = function(*args, **dargs)
477 finally:
478 self.record_prefix = old_record_prefix
479 except error.TestBaseException, e:
480 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000481 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000482 except Exception, e:
483 err_msg = str(e) + '\n'
484 err_msg += traceback.format_exc()
485 self.record('END ABORT', subdir, name, err_msg)
486 raise error.JobError(name + ' failed\n' + traceback.format_exc())
487 else:
488 self.record('END GOOD', subdir, name)
489
jadmanskide292df2008-08-26 20:51:14 +0000490 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000491
492
493 def run_group(self, function, *args, **dargs):
494 """\
495 function:
496 subroutine to run
497 *args:
498 arguments for the function
499 """
500
501 name = function.__name__
502
503 # Allow the tag for the group to be specified.
504 tag = dargs.pop('tag', None)
505 if tag:
506 name = tag
507
jadmanskide292df2008-08-26 20:51:14 +0000508 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000509
510
511 def run_reboot(self, reboot_func, get_kernel_func):
512 """\
513 A specialization of run_group meant specifically for handling
514 a reboot. Includes support for capturing the kernel version
515 after the reboot.
516
517 reboot_func: a function that carries out the reboot
518
519 get_kernel_func: a function that returns a string
520 representing the kernel version.
521 """
522
523 old_record_prefix = self.record_prefix
524 try:
525 self.record('START', None, 'reboot')
526 self.record_prefix += '\t'
527 reboot_func()
528 except Exception, e:
529 self.record_prefix = old_record_prefix
530 err_msg = str(e) + '\n' + traceback.format_exc()
531 self.record('END FAIL', None, 'reboot', err_msg)
532 else:
533 kernel = get_kernel_func()
534 self.record_prefix = old_record_prefix
535 self.record('END GOOD', None, 'reboot',
536 optional_fields={"kernel": kernel})
537
538
539 def record(self, status_code, subdir, operation, status='',
540 optional_fields=None):
541 """
542 Record job-level status
543
544 The intent is to make this file both machine parseable and
545 human readable. That involves a little more complexity, but
546 really isn't all that bad ;-)
547
548 Format is <status code>\t<subdir>\t<operation>\t<status>
549
550 status code: see common_lib.logging.is_valid_status()
551 for valid status definition
552
553 subdir: MUST be a relevant subdirectory in the results,
554 or None, which will be represented as '----'
555
556 operation: description of what you ran (e.g. "dbench", or
557 "mkfs -t foobar /dev/sda9")
558
559 status: error message or "completed sucessfully"
560
561 ------------------------------------------------------------
562
563 Initial tabs indicate indent levels for grouping, and is
564 governed by self.record_prefix
565
566 multiline messages have secondary lines prefaced by a double
567 space (' ')
568
569 Executing this method will trigger the logging of all new
570 warnings to date from the various console loggers.
571 """
572 # poll all our warning loggers for new warnings
573 warnings = self._read_warnings()
574 for timestamp, msg in warnings:
575 self._record("WARN", None, None, msg, timestamp)
576
577 # write out the actual status log line
578 self._record(status_code, subdir, operation, status,
579 optional_fields=optional_fields)
580
581
582 def _read_warnings(self):
583 warnings = []
584 while True:
585 # pull in a line of output from every logger that has
586 # output ready to be read
587 loggers, _, _ = select.select(self.warning_loggers,
588 [], [], 0)
589 closed_loggers = set()
590 for logger in loggers:
591 line = logger.readline()
592 # record any broken pipes (aka line == empty)
593 if len(line) == 0:
594 closed_loggers.add(logger)
595 continue
596 timestamp, msg = line.split('\t', 1)
597 warnings.append((int(timestamp), msg.strip()))
598
599 # stop listening to loggers that are closed
600 self.warning_loggers -= closed_loggers
601
602 # stop if none of the loggers have any output left
603 if not loggers:
604 break
605
606 # sort into timestamp order
607 warnings.sort()
608 return warnings
609
610
611 def _render_record(self, status_code, subdir, operation, status='',
612 epoch_time=None, record_prefix=None,
613 optional_fields=None):
614 """
615 Internal Function to generate a record to be written into a
616 status log. For use by server_job.* classes only.
617 """
618 if subdir:
619 if re.match(r'[\n\t]', subdir):
620 raise ValueError(
621 'Invalid character in subdir string')
622 substr = subdir
623 else:
624 substr = '----'
625
626 if not logging.is_valid_status(status_code):
627 raise ValueError('Invalid status code supplied: %s' %
628 status_code)
629 if not operation:
630 operation = '----'
631 if re.match(r'[\n\t]', operation):
632 raise ValueError(
633 'Invalid character in operation string')
634 operation = operation.rstrip()
635 status = status.rstrip()
636 status = re.sub(r"\t", " ", status)
637 # Ensure any continuation lines are marked so we can
638 # detect them in the status file to ensure it is parsable.
639 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
640
641 if not optional_fields:
642 optional_fields = {}
643
644 # Generate timestamps for inclusion in the logs
645 if epoch_time is None:
646 epoch_time = int(time.time())
647 local_time = time.localtime(epoch_time)
648 optional_fields["timestamp"] = str(epoch_time)
649 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
650 local_time)
651
652 fields = [status_code, substr, operation]
653 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
654 fields.append(status)
655
656 if record_prefix is None:
657 record_prefix = self.record_prefix
658
659 msg = '\t'.join(str(x) for x in fields)
660
661 return record_prefix + msg + '\n'
662
663
664 def _record_prerendered(self, msg):
665 """
666 Record a pre-rendered msg into the status logs. The only
667 change this makes to the message is to add on the local
668 indentation. Should not be called outside of server_job.*
669 classes. Unlike _record, this does not write the message
670 to standard output.
671 """
672 lines = []
673 status_file = os.path.join(self.resultdir, 'status.log')
674 status_log = open(status_file, 'a')
675 for line in msg.splitlines():
676 line = self.record_prefix + line + '\n'
677 lines.append(line)
678 status_log.write(line)
679 status_log.close()
680 self.__parse_status(lines)
681
682
jadmanskicdd0c402008-09-19 21:21:31 +0000683 def _execute_code(self, code, scope):
684 exec(code, scope, scope)
jadmanski10646442008-08-13 14:05:21 +0000685
686
687 def _record(self, status_code, subdir, operation, status='',
688 epoch_time=None, optional_fields=None):
689 """
690 Actual function for recording a single line into the status
691 logs. Should never be called directly, only by job.record as
692 this would bypass the console monitor logging.
693 """
694
695 msg = self._render_record(status_code, subdir, operation,
696 status, epoch_time,
697 optional_fields=optional_fields)
698
699
700 status_file = os.path.join(self.resultdir, 'status.log')
701 sys.stdout.write(msg)
702 open(status_file, "a").write(msg)
703 if subdir:
704 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000705 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000706 open(status_file, "a").write(msg)
707 self.__parse_status(msg.splitlines())
708
709
710 def __parse_status(self, new_lines):
711 if not self.using_parser:
712 return
713 new_tests = self.parser.process_lines(new_lines)
714 for test in new_tests:
715 self.__insert_test(test)
716
717
718 def __insert_test(self, test):
719 """ An internal method to insert a new test result into the
720 database. This method will not raise an exception, even if an
721 error occurs during the insert, to avoid failing a test
722 simply because of unexpected database issues."""
723 try:
724 self.results_db.insert_test(self.job_model, test)
725 except Exception:
726 msg = ("WARNING: An unexpected error occured while "
727 "inserting test results into the database. "
728 "Ignoring error.\n" + traceback.format_exc())
729 print >> sys.stderr, msg
730
mblighcaa62c22008-04-07 21:51:17 +0000731
jadmanskib6eb2f12008-09-12 16:39:36 +0000732
jadmanskia1f3c202008-09-15 19:17:16 +0000733class log_collector(object):
734 def __init__(self, host, client_tag, results_dir):
735 self.host = host
736 if not client_tag:
737 client_tag = "default"
738 self.client_results_dir = os.path.join(host.get_autodir(), "results",
739 client_tag)
740 self.server_results_dir = results_dir
741
742
jadmanskib6eb2f12008-09-12 16:39:36 +0000743 def collect_client_job_results(self):
744 """ A method that collects all the current results of a running
745 client job into the results dir. By default does nothing as no
746 client job is running, but when running a client job you can override
747 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000748
749 # make an effort to wait for the machine to come up
750 try:
751 self.host.wait_up(timeout=30)
752 except error.AutoservError:
753 # don't worry about any errors, we'll try and
754 # get the results anyway
755 pass
756
757
758 # Copy all dirs in default to results_dir
759 keyval_path = self._prepare_for_copying_logs()
760 self.host.get_file(self.client_results_dir + '/',
761 self.server_results_dir)
762 self._process_copied_logs(keyval_path)
763 self._postprocess_copied_logs()
764
765
766 def _prepare_for_copying_logs(self):
767 server_keyval = os.path.join(self.server_results_dir, 'keyval')
768 if not os.path.exists(server_keyval):
769 # Client-side keyval file can be copied directly
770 return
771
772 # Copy client-side keyval to temporary location
773 suffix = '.keyval_%s' % self.host.hostname
774 fd, keyval_path = tempfile.mkstemp(suffix)
775 os.close(fd)
776 try:
777 client_keyval = os.path.join(self.client_results_dir, 'keyval')
778 try:
779 self.host.get_file(client_keyval, keyval_path)
780 finally:
781 # We will squirrel away the client side keyval
782 # away and move it back when we are done
783 remote_temp_dir = self.host.get_tmp_dir()
784 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
785 self.host.run('mv %s %s' % (client_keyval,
786 self.temp_keyval_path))
787 except (error.AutoservRunError, error.AutoservSSHTimeout):
788 print "Prepare for copying logs failed"
789 return keyval_path
790
791
792 def _process_copied_logs(self, keyval_path):
793 if not keyval_path:
794 # Client-side keyval file was copied directly
795 return
796
797 # Append contents of keyval_<host> file to keyval file
798 try:
799 # Read in new and old keyval files
800 new_keyval = utils.read_keyval(keyval_path)
801 old_keyval = utils.read_keyval(self.server_results_dir)
802 # 'Delete' from new keyval entries that are in both
803 tmp_keyval = {}
804 for key, val in new_keyval.iteritems():
805 if key not in old_keyval:
806 tmp_keyval[key] = val
807 # Append new info to keyval file
808 utils.write_keyval(self.server_results_dir, tmp_keyval)
809 # Delete keyval_<host> file
810 os.remove(keyval_path)
811 except IOError:
812 print "Process copied logs failed"
813
814
815 def _postprocess_copied_logs(self):
816 # we can now put our keyval file back
817 client_keyval = os.path.join(self.client_results_dir, 'keyval')
818 try:
819 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
820 except Exception:
821 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000822
823
mbligh57e78662008-06-17 19:53:49 +0000824# a file-like object for catching stderr from an autotest client and
825# extracting status logs from it
826class client_logger(object):
827 """Partial file object to write to both stdout and
828 the status log file. We only implement those methods
829 utils.run() actually calls.
830 """
jadmanskia1f3c202008-09-15 19:17:16 +0000831 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
832 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000833 extract_indent = re.compile(r"^(\t*).*$")
834
jadmanskia1f3c202008-09-15 19:17:16 +0000835 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000836 self.host = host
837 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000838 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000839 self.leftover = ""
840 self.last_line = ""
841 self.logs = {}
842
843
844 def _process_log_dict(self, log_dict):
845 log_list = log_dict.pop("logs", [])
846 for key in sorted(log_dict.iterkeys()):
847 log_list += self._process_log_dict(log_dict.pop(key))
848 return log_list
849
850
851 def _process_logs(self):
852 """Go through the accumulated logs in self.log and print them
853 out to stdout and the status log. Note that this processes
854 logs in an ordering where:
855
856 1) logs to different tags are never interleaved
857 2) logs to x.y come before logs to x.y.z for all z
858 3) logs to x.y come before x.z whenever y < z
859
860 Note that this will in general not be the same as the
861 chronological ordering of the logs. However, if a chronological
862 ordering is desired that one can be reconstructed from the
863 status log by looking at timestamp lines."""
864 log_list = self._process_log_dict(self.logs)
865 for line in log_list:
866 self.job._record_prerendered(line + '\n')
867 if log_list:
868 self.last_line = log_list[-1]
869
870
871 def _process_quoted_line(self, tag, line):
872 """Process a line quoted with an AUTOTEST_STATUS flag. If the
873 tag is blank then we want to push out all the data we've been
874 building up in self.logs, and then the newest line. If the
875 tag is not blank, then push the line into the logs for handling
876 later."""
877 print line
878 if tag == "":
879 self._process_logs()
880 self.job._record_prerendered(line + '\n')
881 self.last_line = line
882 else:
883 tag_parts = [int(x) for x in tag.split(".")]
884 log_dict = self.logs
885 for part in tag_parts:
886 log_dict = log_dict.setdefault(part, {})
887 log_list = log_dict.setdefault("logs", [])
888 log_list.append(line)
889
890
891 def _process_line(self, line):
892 """Write out a line of data to the appropriate stream. Status
893 lines sent by autotest will be prepended with
894 "AUTOTEST_STATUS", and all other lines are ssh error
895 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000896 status_match = self.status_parser.search(line)
897 test_complete_match = self.test_complete_parser.search(line)
898 if status_match:
899 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000900 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000901 elif test_complete_match:
902 fifo_path, = test_complete_match.groups()
903 self.log_collector.collect_client_job_results()
904 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000905 else:
906 print line
907
jadmanski4aeefe12008-06-20 20:04:25 +0000908
909 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000910 # use the indentation of whatever the last log line was
911 indent = self.extract_indent.match(last_line).group(1)
912 # if the last line starts a new group, add an extra indent
913 if last_line.lstrip('\t').startswith("START\t"):
914 indent += '\t'
915 return [self.job._render_record("WARN", None, None, msg,
916 timestamp, indent).rstrip('\n')
917 for timestamp, msg in warnings]
918
919
920 def _process_warnings(self, last_line, log_dict, warnings):
921 if log_dict.keys() in ([], ["logs"]):
922 # there are no sub-jobs, just append the warnings here
923 warnings = self._format_warnings(last_line, warnings)
924 log_list = log_dict.setdefault("logs", [])
925 log_list += warnings
926 for warning in warnings:
927 sys.stdout.write(warning + '\n')
928 else:
929 # there are sub-jobs, so put the warnings in there
930 log_list = log_dict.get("logs", [])
931 if log_list:
932 last_line = log_list[-1]
933 for key in sorted(log_dict.iterkeys()):
934 if key != "logs":
935 self._process_warnings(last_line,
936 log_dict[key],
937 warnings)
938
939
940 def write(self, data):
941 # first check for any new console warnings
942 warnings = self.job._read_warnings()
943 self._process_warnings(self.last_line, self.logs, warnings)
944 # now process the newest data written out
945 data = self.leftover + data
946 lines = data.split("\n")
947 # process every line but the last one
948 for line in lines[:-1]:
949 self._process_line(line)
950 # save the last line for later processing
951 # since we may not have the whole line yet
952 self.leftover = lines[-1]
953
954
955 def flush(self):
956 sys.stdout.flush()
957
958
959 def close(self):
960 if self.leftover:
961 self._process_line(self.leftover)
962 self._process_logs()
963 self.flush()
964
965
mblighcaa62c22008-04-07 21:51:17 +0000966# site_server_job.py may be non-existant or empty, make sure that an
967# appropriate site_server_job class is created nevertheless
968try:
jadmanski0afbb632008-06-06 21:10:57 +0000969 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000970except ImportError:
jadmanski10646442008-08-13 14:05:21 +0000971 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +0000972 pass
973
jadmanski10646442008-08-13 14:05:21 +0000974class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +0000975 pass