blob: b1e6296e4c6a4c401b51a300704e419da93191bd [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
jadmanski10646442008-08-13 14:05:21 +000014import os, sys, re, time, select, subprocess, traceback
15
16from autotest_lib.client.bin import fd_stack
17from autotest_lib.client.common_lib import error, logging
18from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.client.common_lib import utils, packages
21
22
23# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
27 script_file = os.path.join(server_dir, "control_segments", name)
28 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
34preamble = """\
35import os, sys
36
37from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
42from autotest_lib.server.utils import parse_machine
43from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
45
46autotest.Autotest.job = job
jadmanski1c5e3a12008-08-15 23:08:20 +000047hosts.Host.job = job
jadmanski10646442008-08-13 14:05:21 +000048barrier = barrier.barrier
49if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
51"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000057 hostname, user, passwd, port = parse_machine(
58 machine, ssh_user, ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000059
jadmanski1c5e3a12008-08-15 23:08:20 +000060 host = hosts.create_host(hostname, user=user, port=port, password=passwd)
jadmanski807490c2008-09-15 19:15:02 +000061 host.log_kernel()
jadmanski1c5e3a12008-08-15 23:08:20 +000062 at.run(control, host=host)
jadmanski10646442008-08-13 14:05:21 +000063
64job.parallel_simple(run_client, machines)
65"""
66
67crashdumps = """
68def crashdumps(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000069 hostname, user, passwd, port = parse_machine(machine, ssh_user,
70 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000071
jadmanski1c5e3a12008-08-15 23:08:20 +000072 host = hosts.create_host(hostname, user=user, port=port,
73 initialize=False, password=passwd)
74 host.get_crashdumps(test_start_time)
jadmanski10646442008-08-13 14:05:21 +000075
76job.parallel_simple(crashdumps, machines, log=False)
77"""
78
79reboot_segment="""\
80def reboot(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000081 hostname, user, passwd, port = parse_machine(machine, ssh_user,
82 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000083
jadmanski8e72aaf2008-08-20 19:22:29 +000084 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000085 initialize=False, password=passwd)
86 host.reboot()
jadmanski10646442008-08-13 14:05:21 +000087
88job.parallel_simple(reboot, machines, log=False)
89"""
90
91install="""\
92def install(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000093 hostname, user, passwd, port = parse_machine(machine, ssh_user,
94 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000095
jadmanski8e72aaf2008-08-20 19:22:29 +000096 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000097 initialize=False, password=passwd)
98 host.machine_install()
jadmanski10646442008-08-13 14:05:21 +000099
100job.parallel_simple(install, machines, log=False)
101"""
102
103# load up the verifier control segment, with an optional site-specific hook
104verify = load_control_segment("site_verify")
105verify += load_control_segment("verify")
106
107# load up the repair control segment, with an optional site-specific hook
108repair = load_control_segment("site_repair")
109repair += load_control_segment("repair")
110
111
112# load up site-specific code for generating site-specific job data
113try:
114 import site_job
115 get_site_job_data = site_job.get_site_job_data
116 del site_job
117except ImportError:
118 # by default provide a stub that generates no site data
119 def get_site_job_data(job):
120 return {}
121
122
123class base_server_job(object):
124 """The actual job against which we do everything.
125
126 Properties:
127 autodir
128 The top level autotest directory (/usr/local/autotest).
129 serverdir
130 <autodir>/server/
131 clientdir
132 <autodir>/client/
133 conmuxdir
134 <autodir>/conmux/
135 testdir
136 <autodir>/server/tests/
137 site_testdir
138 <autodir>/server/site_tests/
139 control
140 the control file for this job
141 """
142
143 STATUS_VERSION = 1
144
145
146 def __init__(self, control, args, resultdir, label, user, machines,
147 client=False, parse_job='',
148 ssh_user='root', ssh_port=22, ssh_pass=''):
149 """
150 control
151 The control file (pathname of)
152 args
153 args to pass to the control file
154 resultdir
155 where to throw the results
156 label
157 label for the job
158 user
159 Username for the job (email address)
160 client
161 True if a client-side control file
162 """
163 path = os.path.dirname(__file__)
164 self.autodir = os.path.abspath(os.path.join(path, '..'))
165 self.serverdir = os.path.join(self.autodir, 'server')
166 self.testdir = os.path.join(self.serverdir, 'tests')
167 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
168 self.tmpdir = os.path.join(self.serverdir, 'tmp')
169 self.conmuxdir = os.path.join(self.autodir, 'conmux')
170 self.clientdir = os.path.join(self.autodir, 'client')
171 self.toolsdir = os.path.join(self.autodir, 'client/tools')
172 if control:
173 self.control = open(control, 'r').read()
174 self.control = re.sub('\r', '', self.control)
175 else:
176 self.control = None
177 self.resultdir = resultdir
178 if not os.path.exists(resultdir):
179 os.mkdir(resultdir)
180 self.debugdir = os.path.join(resultdir, 'debug')
181 if not os.path.exists(self.debugdir):
182 os.mkdir(self.debugdir)
183 self.status = os.path.join(resultdir, 'status')
184 self.label = label
185 self.user = user
186 self.args = args
187 self.machines = machines
188 self.client = client
189 self.record_prefix = ''
190 self.warning_loggers = set()
191 self.ssh_user = ssh_user
192 self.ssh_port = ssh_port
193 self.ssh_pass = ssh_pass
194
195 self.stdout = fd_stack.fd_stack(1, sys.stdout)
196 self.stderr = fd_stack.fd_stack(2, sys.stderr)
197
198 if os.path.exists(self.status):
199 os.unlink(self.status)
200 job_data = {'label' : label, 'user' : user,
201 'hostname' : ','.join(machines),
202 'status_version' : str(self.STATUS_VERSION)}
203 job_data.update(get_site_job_data(self))
204 utils.write_keyval(self.resultdir, job_data)
205
206 self.parse_job = parse_job
207 if self.parse_job and len(machines) == 1:
208 self.using_parser = True
209 self.init_parser(resultdir)
210 else:
211 self.using_parser = False
212 self.pkgmgr = packages.PackageManager(
213 self.autodir, run_function_dargs={'timeout':600})
214 self.pkgdir = os.path.join(self.autodir, 'packages')
215
216
217 def init_parser(self, resultdir):
218 """Start the continuous parsing of resultdir. This sets up
219 the database connection and inserts the basic job object into
220 the database if necessary."""
221 # redirect parser debugging to .parse.log
222 parse_log = os.path.join(resultdir, '.parse.log')
223 parse_log = open(parse_log, 'w', 0)
224 tko_utils.redirect_parser_debugging(parse_log)
225 # create a job model object and set up the db
226 self.results_db = tko_db.db(autocommit=True)
227 self.parser = status_lib.parser(self.STATUS_VERSION)
228 self.job_model = self.parser.make_job(resultdir)
229 self.parser.start(self.job_model)
230 # check if a job already exists in the db and insert it if
231 # it does not
232 job_idx = self.results_db.find_job(self.parse_job)
233 if job_idx is None:
234 self.results_db.insert_job(self.parse_job,
235 self.job_model)
236 else:
237 machine_idx = self.results_db.lookup_machine(
238 self.job_model.machine)
239 self.job_model.index = job_idx
240 self.job_model.machine_idx = machine_idx
241
242
243 def cleanup_parser(self):
244 """This should be called after the server job is finished
245 to carry out any remaining cleanup (e.g. flushing any
246 remaining test results to the results db)"""
247 if not self.using_parser:
248 return
249 final_tests = self.parser.end()
250 for test in final_tests:
251 self.__insert_test(test)
252 self.using_parser = False
253
254
255 def verify(self):
256 if not self.machines:
257 raise error.AutoservError(
258 'No machines specified to verify')
259 try:
260 namespace = {'machines' : self.machines, 'job' : self, \
261 'ssh_user' : self.ssh_user, \
262 'ssh_port' : self.ssh_port, \
263 'ssh_pass' : self.ssh_pass}
264 self._execute_code(preamble + verify, namespace, namespace)
265 except Exception, e:
266 msg = ('Verify failed\n' + str(e) + '\n'
267 + traceback.format_exc())
268 self.record('ABORT', None, None, msg)
269 raise
270
271
272 def repair(self, host_protection):
273 if not self.machines:
274 raise error.AutoservError('No machines specified to repair')
275 namespace = {'machines': self.machines, 'job': self,
276 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
277 'ssh_pass': self.ssh_pass,
278 'protection_level': host_protection}
279 # no matter what happens during repair, go on to try to reverify
280 try:
281 self._execute_code(preamble + repair, namespace, namespace)
282 except Exception, exc:
283 print 'Exception occured during repair'
284 traceback.print_exc()
285 self.verify()
286
287
288 def precheck(self):
289 """
290 perform any additional checks in derived classes.
291 """
292 pass
293
294
295 def enable_external_logging(self):
296 """Start or restart external logging mechanism.
297 """
298 pass
299
300
301 def disable_external_logging(self):
302 """ Pause or stop external logging mechanism.
303 """
304 pass
305
306
307 def use_external_logging(self):
308 """Return True if external logging should be used.
309 """
310 return False
311
312
313 def parallel_simple(self, function, machines, log=True, timeout=None):
314 """Run 'function' using parallel_simple, with an extra
315 wrapper to handle the necessary setup for continuous parsing,
316 if possible. If continuous parsing is already properly
317 initialized then this should just work."""
318 is_forking = not (len(machines) == 1 and
319 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000320 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000321 def wrapper(machine):
322 self.parse_job += "/" + machine
323 self.using_parser = True
324 self.machines = [machine]
325 self.resultdir = os.path.join(self.resultdir,
326 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000327 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000328 self.init_parser(self.resultdir)
329 result = function(machine)
330 self.cleanup_parser()
331 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000332 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000333 def wrapper(machine):
334 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000335 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000336 result = function(machine)
337 return result
338 else:
339 wrapper = function
340 subcommand.parallel_simple(wrapper, machines, log, timeout)
341
342
343 def run(self, reboot = False, install_before = False,
344 install_after = False, collect_crashdumps = True,
345 namespace = {}):
346 # use a copy so changes don't affect the original dictionary
347 namespace = namespace.copy()
348 machines = self.machines
349
350 self.aborted = False
351 namespace['machines'] = machines
352 namespace['args'] = self.args
353 namespace['job'] = self
354 namespace['ssh_user'] = self.ssh_user
355 namespace['ssh_port'] = self.ssh_port
356 namespace['ssh_pass'] = self.ssh_pass
357 test_start_time = int(time.time())
358
359 os.chdir(self.resultdir)
360
361 self.enable_external_logging()
362 status_log = os.path.join(self.resultdir, 'status.log')
363 try:
364 if install_before and machines:
365 self._execute_code(preamble + install, namespace, namespace)
366 if self.client:
367 namespace['control'] = self.control
368 open('control', 'w').write(self.control)
369 open('control.srv', 'w').write(client_wrapper)
370 server_control = client_wrapper
371 else:
372 open('control.srv', 'w').write(self.control)
373 server_control = self.control
374 self._execute_code(preamble + server_control, namespace,
375 namespace)
376
377 finally:
378 if machines and collect_crashdumps:
379 namespace['test_start_time'] = test_start_time
380 self._execute_code(preamble + crashdumps, namespace,
381 namespace)
382 self.disable_external_logging()
383 if reboot and machines:
384 self._execute_code(preamble + reboot_segment,namespace,
385 namespace)
386 if install_after and machines:
387 self._execute_code(preamble + install, namespace, namespace)
388
389
390 def run_test(self, url, *args, **dargs):
391 """Summon a test object and run it.
392
393 tag
394 tag to add to testname
395 url
396 url of the test to run
397 """
398
399 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000400
401 tag = dargs.pop('tag', None)
402 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000403 testname += '.' + tag
404 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000405
406 outputdir = os.path.join(self.resultdir, subdir)
407 if os.path.exists(outputdir):
408 msg = ("%s already exists, test <%s> may have"
409 " already run with tag <%s>"
410 % (outputdir, testname, tag) )
411 raise error.TestError(msg)
412 os.mkdir(outputdir)
413
414 def group_func():
415 try:
416 test.runtest(self, url, tag, args, dargs)
417 except error.TestBaseException, e:
418 self.record(e.exit_status, subdir, testname, str(e))
419 raise
420 except Exception, e:
421 info = str(e) + "\n" + traceback.format_exc()
422 self.record('FAIL', subdir, testname, info)
423 raise
424 else:
425 self.record('GOOD', subdir, testname,
426 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000427
428 result, exc_info = self._run_group(testname, subdir, group_func)
429 if exc_info and isinstance(exc_info[1], error.TestBaseException):
430 return False
431 elif exc_info:
432 raise exc_info[0], exc_info[1], exc_info[2]
433 else:
434 return True
jadmanski10646442008-08-13 14:05:21 +0000435
436
437 def _run_group(self, name, subdir, function, *args, **dargs):
438 """\
439 Underlying method for running something inside of a group.
440 """
jadmanskide292df2008-08-26 20:51:14 +0000441 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000442 old_record_prefix = self.record_prefix
443 try:
444 self.record('START', subdir, name)
445 self.record_prefix += '\t'
446 try:
447 result = function(*args, **dargs)
448 finally:
449 self.record_prefix = old_record_prefix
450 except error.TestBaseException, e:
451 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000452 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000453 except Exception, e:
454 err_msg = str(e) + '\n'
455 err_msg += traceback.format_exc()
456 self.record('END ABORT', subdir, name, err_msg)
457 raise error.JobError(name + ' failed\n' + traceback.format_exc())
458 else:
459 self.record('END GOOD', subdir, name)
460
jadmanskide292df2008-08-26 20:51:14 +0000461 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000462
463
464 def run_group(self, function, *args, **dargs):
465 """\
466 function:
467 subroutine to run
468 *args:
469 arguments for the function
470 """
471
472 name = function.__name__
473
474 # Allow the tag for the group to be specified.
475 tag = dargs.pop('tag', None)
476 if tag:
477 name = tag
478
jadmanskide292df2008-08-26 20:51:14 +0000479 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000480
481
482 def run_reboot(self, reboot_func, get_kernel_func):
483 """\
484 A specialization of run_group meant specifically for handling
485 a reboot. Includes support for capturing the kernel version
486 after the reboot.
487
488 reboot_func: a function that carries out the reboot
489
490 get_kernel_func: a function that returns a string
491 representing the kernel version.
492 """
493
494 old_record_prefix = self.record_prefix
495 try:
496 self.record('START', None, 'reboot')
497 self.record_prefix += '\t'
498 reboot_func()
499 except Exception, e:
500 self.record_prefix = old_record_prefix
501 err_msg = str(e) + '\n' + traceback.format_exc()
502 self.record('END FAIL', None, 'reboot', err_msg)
503 else:
504 kernel = get_kernel_func()
505 self.record_prefix = old_record_prefix
506 self.record('END GOOD', None, 'reboot',
507 optional_fields={"kernel": kernel})
508
509
510 def record(self, status_code, subdir, operation, status='',
511 optional_fields=None):
512 """
513 Record job-level status
514
515 The intent is to make this file both machine parseable and
516 human readable. That involves a little more complexity, but
517 really isn't all that bad ;-)
518
519 Format is <status code>\t<subdir>\t<operation>\t<status>
520
521 status code: see common_lib.logging.is_valid_status()
522 for valid status definition
523
524 subdir: MUST be a relevant subdirectory in the results,
525 or None, which will be represented as '----'
526
527 operation: description of what you ran (e.g. "dbench", or
528 "mkfs -t foobar /dev/sda9")
529
530 status: error message or "completed sucessfully"
531
532 ------------------------------------------------------------
533
534 Initial tabs indicate indent levels for grouping, and is
535 governed by self.record_prefix
536
537 multiline messages have secondary lines prefaced by a double
538 space (' ')
539
540 Executing this method will trigger the logging of all new
541 warnings to date from the various console loggers.
542 """
543 # poll all our warning loggers for new warnings
544 warnings = self._read_warnings()
545 for timestamp, msg in warnings:
546 self._record("WARN", None, None, msg, timestamp)
547
548 # write out the actual status log line
549 self._record(status_code, subdir, operation, status,
550 optional_fields=optional_fields)
551
552
553 def _read_warnings(self):
554 warnings = []
555 while True:
556 # pull in a line of output from every logger that has
557 # output ready to be read
558 loggers, _, _ = select.select(self.warning_loggers,
559 [], [], 0)
560 closed_loggers = set()
561 for logger in loggers:
562 line = logger.readline()
563 # record any broken pipes (aka line == empty)
564 if len(line) == 0:
565 closed_loggers.add(logger)
566 continue
567 timestamp, msg = line.split('\t', 1)
568 warnings.append((int(timestamp), msg.strip()))
569
570 # stop listening to loggers that are closed
571 self.warning_loggers -= closed_loggers
572
573 # stop if none of the loggers have any output left
574 if not loggers:
575 break
576
577 # sort into timestamp order
578 warnings.sort()
579 return warnings
580
581
582 def _render_record(self, status_code, subdir, operation, status='',
583 epoch_time=None, record_prefix=None,
584 optional_fields=None):
585 """
586 Internal Function to generate a record to be written into a
587 status log. For use by server_job.* classes only.
588 """
589 if subdir:
590 if re.match(r'[\n\t]', subdir):
591 raise ValueError(
592 'Invalid character in subdir string')
593 substr = subdir
594 else:
595 substr = '----'
596
597 if not logging.is_valid_status(status_code):
598 raise ValueError('Invalid status code supplied: %s' %
599 status_code)
600 if not operation:
601 operation = '----'
602 if re.match(r'[\n\t]', operation):
603 raise ValueError(
604 'Invalid character in operation string')
605 operation = operation.rstrip()
606 status = status.rstrip()
607 status = re.sub(r"\t", " ", status)
608 # Ensure any continuation lines are marked so we can
609 # detect them in the status file to ensure it is parsable.
610 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
611
612 if not optional_fields:
613 optional_fields = {}
614
615 # Generate timestamps for inclusion in the logs
616 if epoch_time is None:
617 epoch_time = int(time.time())
618 local_time = time.localtime(epoch_time)
619 optional_fields["timestamp"] = str(epoch_time)
620 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
621 local_time)
622
623 fields = [status_code, substr, operation]
624 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
625 fields.append(status)
626
627 if record_prefix is None:
628 record_prefix = self.record_prefix
629
630 msg = '\t'.join(str(x) for x in fields)
631
632 return record_prefix + msg + '\n'
633
634
635 def _record_prerendered(self, msg):
636 """
637 Record a pre-rendered msg into the status logs. The only
638 change this makes to the message is to add on the local
639 indentation. Should not be called outside of server_job.*
640 classes. Unlike _record, this does not write the message
641 to standard output.
642 """
643 lines = []
644 status_file = os.path.join(self.resultdir, 'status.log')
645 status_log = open(status_file, 'a')
646 for line in msg.splitlines():
647 line = self.record_prefix + line + '\n'
648 lines.append(line)
649 status_log.write(line)
650 status_log.close()
651 self.__parse_status(lines)
652
653
654 def _execute_code(self, code, global_scope, local_scope):
655 exec(code, global_scope, local_scope)
656
657
658 def _record(self, status_code, subdir, operation, status='',
659 epoch_time=None, optional_fields=None):
660 """
661 Actual function for recording a single line into the status
662 logs. Should never be called directly, only by job.record as
663 this would bypass the console monitor logging.
664 """
665
666 msg = self._render_record(status_code, subdir, operation,
667 status, epoch_time,
668 optional_fields=optional_fields)
669
670
671 status_file = os.path.join(self.resultdir, 'status.log')
672 sys.stdout.write(msg)
673 open(status_file, "a").write(msg)
674 if subdir:
675 test_dir = os.path.join(self.resultdir, subdir)
676 status_file = os.path.join(test_dir, 'status')
677 open(status_file, "a").write(msg)
678 self.__parse_status(msg.splitlines())
679
680
681 def __parse_status(self, new_lines):
682 if not self.using_parser:
683 return
684 new_tests = self.parser.process_lines(new_lines)
685 for test in new_tests:
686 self.__insert_test(test)
687
688
689 def __insert_test(self, test):
690 """ An internal method to insert a new test result into the
691 database. This method will not raise an exception, even if an
692 error occurs during the insert, to avoid failing a test
693 simply because of unexpected database issues."""
694 try:
695 self.results_db.insert_test(self.job_model, test)
696 except Exception:
697 msg = ("WARNING: An unexpected error occured while "
698 "inserting test results into the database. "
699 "Ignoring error.\n" + traceback.format_exc())
700 print >> sys.stderr, msg
701
mblighcaa62c22008-04-07 21:51:17 +0000702
jadmanskib6eb2f12008-09-12 16:39:36 +0000703
jadmanskia1f3c202008-09-15 19:17:16 +0000704class log_collector(object):
705 def __init__(self, host, client_tag, results_dir):
706 self.host = host
707 if not client_tag:
708 client_tag = "default"
709 self.client_results_dir = os.path.join(host.get_autodir(), "results",
710 client_tag)
711 self.server_results_dir = results_dir
712
713
jadmanskib6eb2f12008-09-12 16:39:36 +0000714 def collect_client_job_results(self):
715 """ A method that collects all the current results of a running
716 client job into the results dir. By default does nothing as no
717 client job is running, but when running a client job you can override
718 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000719
720 # make an effort to wait for the machine to come up
721 try:
722 self.host.wait_up(timeout=30)
723 except error.AutoservError:
724 # don't worry about any errors, we'll try and
725 # get the results anyway
726 pass
727
728
729 # Copy all dirs in default to results_dir
730 keyval_path = self._prepare_for_copying_logs()
731 self.host.get_file(self.client_results_dir + '/',
732 self.server_results_dir)
733 self._process_copied_logs(keyval_path)
734 self._postprocess_copied_logs()
735
736
737 def _prepare_for_copying_logs(self):
738 server_keyval = os.path.join(self.server_results_dir, 'keyval')
739 if not os.path.exists(server_keyval):
740 # Client-side keyval file can be copied directly
741 return
742
743 # Copy client-side keyval to temporary location
744 suffix = '.keyval_%s' % self.host.hostname
745 fd, keyval_path = tempfile.mkstemp(suffix)
746 os.close(fd)
747 try:
748 client_keyval = os.path.join(self.client_results_dir, 'keyval')
749 try:
750 self.host.get_file(client_keyval, keyval_path)
751 finally:
752 # We will squirrel away the client side keyval
753 # away and move it back when we are done
754 remote_temp_dir = self.host.get_tmp_dir()
755 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
756 self.host.run('mv %s %s' % (client_keyval,
757 self.temp_keyval_path))
758 except (error.AutoservRunError, error.AutoservSSHTimeout):
759 print "Prepare for copying logs failed"
760 return keyval_path
761
762
763 def _process_copied_logs(self, keyval_path):
764 if not keyval_path:
765 # Client-side keyval file was copied directly
766 return
767
768 # Append contents of keyval_<host> file to keyval file
769 try:
770 # Read in new and old keyval files
771 new_keyval = utils.read_keyval(keyval_path)
772 old_keyval = utils.read_keyval(self.server_results_dir)
773 # 'Delete' from new keyval entries that are in both
774 tmp_keyval = {}
775 for key, val in new_keyval.iteritems():
776 if key not in old_keyval:
777 tmp_keyval[key] = val
778 # Append new info to keyval file
779 utils.write_keyval(self.server_results_dir, tmp_keyval)
780 # Delete keyval_<host> file
781 os.remove(keyval_path)
782 except IOError:
783 print "Process copied logs failed"
784
785
786 def _postprocess_copied_logs(self):
787 # we can now put our keyval file back
788 client_keyval = os.path.join(self.client_results_dir, 'keyval')
789 try:
790 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
791 except Exception:
792 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000793
794
mbligh57e78662008-06-17 19:53:49 +0000795# a file-like object for catching stderr from an autotest client and
796# extracting status logs from it
797class client_logger(object):
798 """Partial file object to write to both stdout and
799 the status log file. We only implement those methods
800 utils.run() actually calls.
801 """
jadmanskia1f3c202008-09-15 19:17:16 +0000802 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
803 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000804 extract_indent = re.compile(r"^(\t*).*$")
805
jadmanskia1f3c202008-09-15 19:17:16 +0000806 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000807 self.host = host
808 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000809 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000810 self.leftover = ""
811 self.last_line = ""
812 self.logs = {}
813
814
815 def _process_log_dict(self, log_dict):
816 log_list = log_dict.pop("logs", [])
817 for key in sorted(log_dict.iterkeys()):
818 log_list += self._process_log_dict(log_dict.pop(key))
819 return log_list
820
821
822 def _process_logs(self):
823 """Go through the accumulated logs in self.log and print them
824 out to stdout and the status log. Note that this processes
825 logs in an ordering where:
826
827 1) logs to different tags are never interleaved
828 2) logs to x.y come before logs to x.y.z for all z
829 3) logs to x.y come before x.z whenever y < z
830
831 Note that this will in general not be the same as the
832 chronological ordering of the logs. However, if a chronological
833 ordering is desired that one can be reconstructed from the
834 status log by looking at timestamp lines."""
835 log_list = self._process_log_dict(self.logs)
836 for line in log_list:
837 self.job._record_prerendered(line + '\n')
838 if log_list:
839 self.last_line = log_list[-1]
840
841
842 def _process_quoted_line(self, tag, line):
843 """Process a line quoted with an AUTOTEST_STATUS flag. If the
844 tag is blank then we want to push out all the data we've been
845 building up in self.logs, and then the newest line. If the
846 tag is not blank, then push the line into the logs for handling
847 later."""
848 print line
849 if tag == "":
850 self._process_logs()
851 self.job._record_prerendered(line + '\n')
852 self.last_line = line
853 else:
854 tag_parts = [int(x) for x in tag.split(".")]
855 log_dict = self.logs
856 for part in tag_parts:
857 log_dict = log_dict.setdefault(part, {})
858 log_list = log_dict.setdefault("logs", [])
859 log_list.append(line)
860
861
862 def _process_line(self, line):
863 """Write out a line of data to the appropriate stream. Status
864 lines sent by autotest will be prepended with
865 "AUTOTEST_STATUS", and all other lines are ssh error
866 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000867 status_match = self.status_parser.search(line)
868 test_complete_match = self.test_complete_parser.search(line)
869 if status_match:
870 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000871 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000872 elif test_complete_match:
873 fifo_path, = test_complete_match.groups()
874 self.log_collector.collect_client_job_results()
875 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000876 else:
877 print line
878
jadmanski4aeefe12008-06-20 20:04:25 +0000879
880 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000881 # use the indentation of whatever the last log line was
882 indent = self.extract_indent.match(last_line).group(1)
883 # if the last line starts a new group, add an extra indent
884 if last_line.lstrip('\t').startswith("START\t"):
885 indent += '\t'
886 return [self.job._render_record("WARN", None, None, msg,
887 timestamp, indent).rstrip('\n')
888 for timestamp, msg in warnings]
889
890
891 def _process_warnings(self, last_line, log_dict, warnings):
892 if log_dict.keys() in ([], ["logs"]):
893 # there are no sub-jobs, just append the warnings here
894 warnings = self._format_warnings(last_line, warnings)
895 log_list = log_dict.setdefault("logs", [])
896 log_list += warnings
897 for warning in warnings:
898 sys.stdout.write(warning + '\n')
899 else:
900 # there are sub-jobs, so put the warnings in there
901 log_list = log_dict.get("logs", [])
902 if log_list:
903 last_line = log_list[-1]
904 for key in sorted(log_dict.iterkeys()):
905 if key != "logs":
906 self._process_warnings(last_line,
907 log_dict[key],
908 warnings)
909
910
911 def write(self, data):
912 # first check for any new console warnings
913 warnings = self.job._read_warnings()
914 self._process_warnings(self.last_line, self.logs, warnings)
915 # now process the newest data written out
916 data = self.leftover + data
917 lines = data.split("\n")
918 # process every line but the last one
919 for line in lines[:-1]:
920 self._process_line(line)
921 # save the last line for later processing
922 # since we may not have the whole line yet
923 self.leftover = lines[-1]
924
925
926 def flush(self):
927 sys.stdout.flush()
928
929
930 def close(self):
931 if self.leftover:
932 self._process_line(self.leftover)
933 self._process_logs()
934 self.flush()
935
936
mblighcaa62c22008-04-07 21:51:17 +0000937# site_server_job.py may be non-existant or empty, make sure that an
938# appropriate site_server_job class is created nevertheless
939try:
jadmanski0afbb632008-06-06 21:10:57 +0000940 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000941except ImportError:
jadmanski10646442008-08-13 14:05:21 +0000942 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +0000943 pass
944
jadmanski10646442008-08-13 14:05:21 +0000945class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +0000946 pass