blob: 61c3e58e6b9f61cc292f3e25d0cb2b655f0d8703 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
jadmanski10646442008-08-13 14:05:21 +000014import os, sys, re, time, select, subprocess, traceback
15
16from autotest_lib.client.bin import fd_stack
17from autotest_lib.client.common_lib import error, logging
18from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.client.common_lib import utils, packages
21
22
23# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
27 script_file = os.path.join(server_dir, "control_segments", name)
28 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
34preamble = """\
35import os, sys
36
37from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
42from autotest_lib.server.utils import parse_machine
43from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
45
46autotest.Autotest.job = job
jadmanski1c5e3a12008-08-15 23:08:20 +000047hosts.Host.job = job
jadmanski10646442008-08-13 14:05:21 +000048barrier = barrier.barrier
49if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
51"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000057 hostname, user, passwd, port = parse_machine(
58 machine, ssh_user, ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000059
jadmanski1c5e3a12008-08-15 23:08:20 +000060 host = hosts.create_host(hostname, user=user, port=port, password=passwd)
61 at.run(control, host=host)
jadmanski10646442008-08-13 14:05:21 +000062
63job.parallel_simple(run_client, machines)
64"""
65
66crashdumps = """
67def crashdumps(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000068 hostname, user, passwd, port = parse_machine(machine, ssh_user,
69 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000070
jadmanski1c5e3a12008-08-15 23:08:20 +000071 host = hosts.create_host(hostname, user=user, port=port,
72 initialize=False, password=passwd)
73 host.get_crashdumps(test_start_time)
jadmanski10646442008-08-13 14:05:21 +000074
75job.parallel_simple(crashdumps, machines, log=False)
76"""
77
78reboot_segment="""\
79def reboot(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000080 hostname, user, passwd, port = parse_machine(machine, ssh_user,
81 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000082
jadmanski8e72aaf2008-08-20 19:22:29 +000083 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000084 initialize=False, password=passwd)
85 host.reboot()
jadmanski10646442008-08-13 14:05:21 +000086
87job.parallel_simple(reboot, machines, log=False)
88"""
89
90install="""\
91def install(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000092 hostname, user, passwd, port = parse_machine(machine, ssh_user,
93 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000094
jadmanski8e72aaf2008-08-20 19:22:29 +000095 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000096 initialize=False, password=passwd)
97 host.machine_install()
jadmanski10646442008-08-13 14:05:21 +000098
99job.parallel_simple(install, machines, log=False)
100"""
101
102# load up the verifier control segment, with an optional site-specific hook
103verify = load_control_segment("site_verify")
104verify += load_control_segment("verify")
105
106# load up the repair control segment, with an optional site-specific hook
107repair = load_control_segment("site_repair")
108repair += load_control_segment("repair")
109
110
111# load up site-specific code for generating site-specific job data
112try:
113 import site_job
114 get_site_job_data = site_job.get_site_job_data
115 del site_job
116except ImportError:
117 # by default provide a stub that generates no site data
118 def get_site_job_data(job):
119 return {}
120
121
122class base_server_job(object):
123 """The actual job against which we do everything.
124
125 Properties:
126 autodir
127 The top level autotest directory (/usr/local/autotest).
128 serverdir
129 <autodir>/server/
130 clientdir
131 <autodir>/client/
132 conmuxdir
133 <autodir>/conmux/
134 testdir
135 <autodir>/server/tests/
136 site_testdir
137 <autodir>/server/site_tests/
138 control
139 the control file for this job
140 """
141
142 STATUS_VERSION = 1
143
144
145 def __init__(self, control, args, resultdir, label, user, machines,
146 client=False, parse_job='',
147 ssh_user='root', ssh_port=22, ssh_pass=''):
148 """
149 control
150 The control file (pathname of)
151 args
152 args to pass to the control file
153 resultdir
154 where to throw the results
155 label
156 label for the job
157 user
158 Username for the job (email address)
159 client
160 True if a client-side control file
161 """
162 path = os.path.dirname(__file__)
163 self.autodir = os.path.abspath(os.path.join(path, '..'))
164 self.serverdir = os.path.join(self.autodir, 'server')
165 self.testdir = os.path.join(self.serverdir, 'tests')
166 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
167 self.tmpdir = os.path.join(self.serverdir, 'tmp')
168 self.conmuxdir = os.path.join(self.autodir, 'conmux')
169 self.clientdir = os.path.join(self.autodir, 'client')
170 self.toolsdir = os.path.join(self.autodir, 'client/tools')
171 if control:
172 self.control = open(control, 'r').read()
173 self.control = re.sub('\r', '', self.control)
174 else:
175 self.control = None
176 self.resultdir = resultdir
177 if not os.path.exists(resultdir):
178 os.mkdir(resultdir)
179 self.debugdir = os.path.join(resultdir, 'debug')
180 if not os.path.exists(self.debugdir):
181 os.mkdir(self.debugdir)
182 self.status = os.path.join(resultdir, 'status')
183 self.label = label
184 self.user = user
185 self.args = args
186 self.machines = machines
187 self.client = client
188 self.record_prefix = ''
189 self.warning_loggers = set()
190 self.ssh_user = ssh_user
191 self.ssh_port = ssh_port
192 self.ssh_pass = ssh_pass
193
194 self.stdout = fd_stack.fd_stack(1, sys.stdout)
195 self.stderr = fd_stack.fd_stack(2, sys.stderr)
196
197 if os.path.exists(self.status):
198 os.unlink(self.status)
199 job_data = {'label' : label, 'user' : user,
200 'hostname' : ','.join(machines),
201 'status_version' : str(self.STATUS_VERSION)}
202 job_data.update(get_site_job_data(self))
203 utils.write_keyval(self.resultdir, job_data)
204
205 self.parse_job = parse_job
206 if self.parse_job and len(machines) == 1:
207 self.using_parser = True
208 self.init_parser(resultdir)
209 else:
210 self.using_parser = False
211 self.pkgmgr = packages.PackageManager(
212 self.autodir, run_function_dargs={'timeout':600})
213 self.pkgdir = os.path.join(self.autodir, 'packages')
214
215
216 def init_parser(self, resultdir):
217 """Start the continuous parsing of resultdir. This sets up
218 the database connection and inserts the basic job object into
219 the database if necessary."""
220 # redirect parser debugging to .parse.log
221 parse_log = os.path.join(resultdir, '.parse.log')
222 parse_log = open(parse_log, 'w', 0)
223 tko_utils.redirect_parser_debugging(parse_log)
224 # create a job model object and set up the db
225 self.results_db = tko_db.db(autocommit=True)
226 self.parser = status_lib.parser(self.STATUS_VERSION)
227 self.job_model = self.parser.make_job(resultdir)
228 self.parser.start(self.job_model)
229 # check if a job already exists in the db and insert it if
230 # it does not
231 job_idx = self.results_db.find_job(self.parse_job)
232 if job_idx is None:
233 self.results_db.insert_job(self.parse_job,
234 self.job_model)
235 else:
236 machine_idx = self.results_db.lookup_machine(
237 self.job_model.machine)
238 self.job_model.index = job_idx
239 self.job_model.machine_idx = machine_idx
240
241
242 def cleanup_parser(self):
243 """This should be called after the server job is finished
244 to carry out any remaining cleanup (e.g. flushing any
245 remaining test results to the results db)"""
246 if not self.using_parser:
247 return
248 final_tests = self.parser.end()
249 for test in final_tests:
250 self.__insert_test(test)
251 self.using_parser = False
252
253
254 def verify(self):
255 if not self.machines:
256 raise error.AutoservError(
257 'No machines specified to verify')
258 try:
259 namespace = {'machines' : self.machines, 'job' : self, \
260 'ssh_user' : self.ssh_user, \
261 'ssh_port' : self.ssh_port, \
262 'ssh_pass' : self.ssh_pass}
263 self._execute_code(preamble + verify, namespace, namespace)
264 except Exception, e:
265 msg = ('Verify failed\n' + str(e) + '\n'
266 + traceback.format_exc())
267 self.record('ABORT', None, None, msg)
268 raise
269
270
271 def repair(self, host_protection):
272 if not self.machines:
273 raise error.AutoservError('No machines specified to repair')
274 namespace = {'machines': self.machines, 'job': self,
275 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
276 'ssh_pass': self.ssh_pass,
277 'protection_level': host_protection}
278 # no matter what happens during repair, go on to try to reverify
279 try:
280 self._execute_code(preamble + repair, namespace, namespace)
281 except Exception, exc:
282 print 'Exception occured during repair'
283 traceback.print_exc()
284 self.verify()
285
286
287 def precheck(self):
288 """
289 perform any additional checks in derived classes.
290 """
291 pass
292
293
294 def enable_external_logging(self):
295 """Start or restart external logging mechanism.
296 """
297 pass
298
299
300 def disable_external_logging(self):
301 """ Pause or stop external logging mechanism.
302 """
303 pass
304
305
306 def use_external_logging(self):
307 """Return True if external logging should be used.
308 """
309 return False
310
311
312 def parallel_simple(self, function, machines, log=True, timeout=None):
313 """Run 'function' using parallel_simple, with an extra
314 wrapper to handle the necessary setup for continuous parsing,
315 if possible. If continuous parsing is already properly
316 initialized then this should just work."""
317 is_forking = not (len(machines) == 1 and
318 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000319 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000320 def wrapper(machine):
321 self.parse_job += "/" + machine
322 self.using_parser = True
323 self.machines = [machine]
324 self.resultdir = os.path.join(self.resultdir,
325 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000326 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000327 self.init_parser(self.resultdir)
328 result = function(machine)
329 self.cleanup_parser()
330 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000331 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000332 def wrapper(machine):
333 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000334 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000335 result = function(machine)
336 return result
337 else:
338 wrapper = function
339 subcommand.parallel_simple(wrapper, machines, log, timeout)
340
341
342 def run(self, reboot = False, install_before = False,
343 install_after = False, collect_crashdumps = True,
344 namespace = {}):
345 # use a copy so changes don't affect the original dictionary
346 namespace = namespace.copy()
347 machines = self.machines
348
349 self.aborted = False
350 namespace['machines'] = machines
351 namespace['args'] = self.args
352 namespace['job'] = self
353 namespace['ssh_user'] = self.ssh_user
354 namespace['ssh_port'] = self.ssh_port
355 namespace['ssh_pass'] = self.ssh_pass
356 test_start_time = int(time.time())
357
358 os.chdir(self.resultdir)
359
360 self.enable_external_logging()
361 status_log = os.path.join(self.resultdir, 'status.log')
362 try:
363 if install_before and machines:
364 self._execute_code(preamble + install, namespace, namespace)
365 if self.client:
366 namespace['control'] = self.control
367 open('control', 'w').write(self.control)
368 open('control.srv', 'w').write(client_wrapper)
369 server_control = client_wrapper
370 else:
371 open('control.srv', 'w').write(self.control)
372 server_control = self.control
373 self._execute_code(preamble + server_control, namespace,
374 namespace)
375
376 finally:
377 if machines and collect_crashdumps:
378 namespace['test_start_time'] = test_start_time
379 self._execute_code(preamble + crashdumps, namespace,
380 namespace)
381 self.disable_external_logging()
382 if reboot and machines:
383 self._execute_code(preamble + reboot_segment,namespace,
384 namespace)
385 if install_after and machines:
386 self._execute_code(preamble + install, namespace, namespace)
387
388
389 def run_test(self, url, *args, **dargs):
390 """Summon a test object and run it.
391
392 tag
393 tag to add to testname
394 url
395 url of the test to run
396 """
397
398 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000399
400 tag = dargs.pop('tag', None)
401 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000402 testname += '.' + tag
403 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000404
405 outputdir = os.path.join(self.resultdir, subdir)
406 if os.path.exists(outputdir):
407 msg = ("%s already exists, test <%s> may have"
408 " already run with tag <%s>"
409 % (outputdir, testname, tag) )
410 raise error.TestError(msg)
411 os.mkdir(outputdir)
412
413 def group_func():
414 try:
415 test.runtest(self, url, tag, args, dargs)
416 except error.TestBaseException, e:
417 self.record(e.exit_status, subdir, testname, str(e))
418 raise
419 except Exception, e:
420 info = str(e) + "\n" + traceback.format_exc()
421 self.record('FAIL', subdir, testname, info)
422 raise
423 else:
424 self.record('GOOD', subdir, testname,
425 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000426
427 result, exc_info = self._run_group(testname, subdir, group_func)
428 if exc_info and isinstance(exc_info[1], error.TestBaseException):
429 return False
430 elif exc_info:
431 raise exc_info[0], exc_info[1], exc_info[2]
432 else:
433 return True
jadmanski10646442008-08-13 14:05:21 +0000434
435
436 def _run_group(self, name, subdir, function, *args, **dargs):
437 """\
438 Underlying method for running something inside of a group.
439 """
jadmanskide292df2008-08-26 20:51:14 +0000440 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000441 old_record_prefix = self.record_prefix
442 try:
443 self.record('START', subdir, name)
444 self.record_prefix += '\t'
445 try:
446 result = function(*args, **dargs)
447 finally:
448 self.record_prefix = old_record_prefix
449 except error.TestBaseException, e:
450 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000451 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000452 except Exception, e:
453 err_msg = str(e) + '\n'
454 err_msg += traceback.format_exc()
455 self.record('END ABORT', subdir, name, err_msg)
456 raise error.JobError(name + ' failed\n' + traceback.format_exc())
457 else:
458 self.record('END GOOD', subdir, name)
459
jadmanskide292df2008-08-26 20:51:14 +0000460 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000461
462
463 def run_group(self, function, *args, **dargs):
464 """\
465 function:
466 subroutine to run
467 *args:
468 arguments for the function
469 """
470
471 name = function.__name__
472
473 # Allow the tag for the group to be specified.
474 tag = dargs.pop('tag', None)
475 if tag:
476 name = tag
477
jadmanskide292df2008-08-26 20:51:14 +0000478 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000479
480
481 def run_reboot(self, reboot_func, get_kernel_func):
482 """\
483 A specialization of run_group meant specifically for handling
484 a reboot. Includes support for capturing the kernel version
485 after the reboot.
486
487 reboot_func: a function that carries out the reboot
488
489 get_kernel_func: a function that returns a string
490 representing the kernel version.
491 """
492
493 old_record_prefix = self.record_prefix
494 try:
495 self.record('START', None, 'reboot')
496 self.record_prefix += '\t'
497 reboot_func()
498 except Exception, e:
499 self.record_prefix = old_record_prefix
500 err_msg = str(e) + '\n' + traceback.format_exc()
501 self.record('END FAIL', None, 'reboot', err_msg)
502 else:
503 kernel = get_kernel_func()
504 self.record_prefix = old_record_prefix
505 self.record('END GOOD', None, 'reboot',
506 optional_fields={"kernel": kernel})
507
508
509 def record(self, status_code, subdir, operation, status='',
510 optional_fields=None):
511 """
512 Record job-level status
513
514 The intent is to make this file both machine parseable and
515 human readable. That involves a little more complexity, but
516 really isn't all that bad ;-)
517
518 Format is <status code>\t<subdir>\t<operation>\t<status>
519
520 status code: see common_lib.logging.is_valid_status()
521 for valid status definition
522
523 subdir: MUST be a relevant subdirectory in the results,
524 or None, which will be represented as '----'
525
526 operation: description of what you ran (e.g. "dbench", or
527 "mkfs -t foobar /dev/sda9")
528
529 status: error message or "completed sucessfully"
530
531 ------------------------------------------------------------
532
533 Initial tabs indicate indent levels for grouping, and is
534 governed by self.record_prefix
535
536 multiline messages have secondary lines prefaced by a double
537 space (' ')
538
539 Executing this method will trigger the logging of all new
540 warnings to date from the various console loggers.
541 """
542 # poll all our warning loggers for new warnings
543 warnings = self._read_warnings()
544 for timestamp, msg in warnings:
545 self._record("WARN", None, None, msg, timestamp)
546
547 # write out the actual status log line
548 self._record(status_code, subdir, operation, status,
549 optional_fields=optional_fields)
550
551
552 def _read_warnings(self):
553 warnings = []
554 while True:
555 # pull in a line of output from every logger that has
556 # output ready to be read
557 loggers, _, _ = select.select(self.warning_loggers,
558 [], [], 0)
559 closed_loggers = set()
560 for logger in loggers:
561 line = logger.readline()
562 # record any broken pipes (aka line == empty)
563 if len(line) == 0:
564 closed_loggers.add(logger)
565 continue
566 timestamp, msg = line.split('\t', 1)
567 warnings.append((int(timestamp), msg.strip()))
568
569 # stop listening to loggers that are closed
570 self.warning_loggers -= closed_loggers
571
572 # stop if none of the loggers have any output left
573 if not loggers:
574 break
575
576 # sort into timestamp order
577 warnings.sort()
578 return warnings
579
580
581 def _render_record(self, status_code, subdir, operation, status='',
582 epoch_time=None, record_prefix=None,
583 optional_fields=None):
584 """
585 Internal Function to generate a record to be written into a
586 status log. For use by server_job.* classes only.
587 """
588 if subdir:
589 if re.match(r'[\n\t]', subdir):
590 raise ValueError(
591 'Invalid character in subdir string')
592 substr = subdir
593 else:
594 substr = '----'
595
596 if not logging.is_valid_status(status_code):
597 raise ValueError('Invalid status code supplied: %s' %
598 status_code)
599 if not operation:
600 operation = '----'
601 if re.match(r'[\n\t]', operation):
602 raise ValueError(
603 'Invalid character in operation string')
604 operation = operation.rstrip()
605 status = status.rstrip()
606 status = re.sub(r"\t", " ", status)
607 # Ensure any continuation lines are marked so we can
608 # detect them in the status file to ensure it is parsable.
609 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
610
611 if not optional_fields:
612 optional_fields = {}
613
614 # Generate timestamps for inclusion in the logs
615 if epoch_time is None:
616 epoch_time = int(time.time())
617 local_time = time.localtime(epoch_time)
618 optional_fields["timestamp"] = str(epoch_time)
619 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
620 local_time)
621
622 fields = [status_code, substr, operation]
623 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
624 fields.append(status)
625
626 if record_prefix is None:
627 record_prefix = self.record_prefix
628
629 msg = '\t'.join(str(x) for x in fields)
630
631 return record_prefix + msg + '\n'
632
633
634 def _record_prerendered(self, msg):
635 """
636 Record a pre-rendered msg into the status logs. The only
637 change this makes to the message is to add on the local
638 indentation. Should not be called outside of server_job.*
639 classes. Unlike _record, this does not write the message
640 to standard output.
641 """
642 lines = []
643 status_file = os.path.join(self.resultdir, 'status.log')
644 status_log = open(status_file, 'a')
645 for line in msg.splitlines():
646 line = self.record_prefix + line + '\n'
647 lines.append(line)
648 status_log.write(line)
649 status_log.close()
650 self.__parse_status(lines)
651
652
653 def _execute_code(self, code, global_scope, local_scope):
654 exec(code, global_scope, local_scope)
655
656
657 def _record(self, status_code, subdir, operation, status='',
658 epoch_time=None, optional_fields=None):
659 """
660 Actual function for recording a single line into the status
661 logs. Should never be called directly, only by job.record as
662 this would bypass the console monitor logging.
663 """
664
665 msg = self._render_record(status_code, subdir, operation,
666 status, epoch_time,
667 optional_fields=optional_fields)
668
669
670 status_file = os.path.join(self.resultdir, 'status.log')
671 sys.stdout.write(msg)
672 open(status_file, "a").write(msg)
673 if subdir:
674 test_dir = os.path.join(self.resultdir, subdir)
675 status_file = os.path.join(test_dir, 'status')
676 open(status_file, "a").write(msg)
677 self.__parse_status(msg.splitlines())
678
679
680 def __parse_status(self, new_lines):
681 if not self.using_parser:
682 return
683 new_tests = self.parser.process_lines(new_lines)
684 for test in new_tests:
685 self.__insert_test(test)
686
687
688 def __insert_test(self, test):
689 """ An internal method to insert a new test result into the
690 database. This method will not raise an exception, even if an
691 error occurs during the insert, to avoid failing a test
692 simply because of unexpected database issues."""
693 try:
694 self.results_db.insert_test(self.job_model, test)
695 except Exception:
696 msg = ("WARNING: An unexpected error occured while "
697 "inserting test results into the database. "
698 "Ignoring error.\n" + traceback.format_exc())
699 print >> sys.stderr, msg
700
mblighcaa62c22008-04-07 21:51:17 +0000701
mbligh57e78662008-06-17 19:53:49 +0000702# a file-like object for catching stderr from an autotest client and
703# extracting status logs from it
704class client_logger(object):
705 """Partial file object to write to both stdout and
706 the status log file. We only implement those methods
707 utils.run() actually calls.
708 """
709 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
710 extract_indent = re.compile(r"^(\t*).*$")
711
712 def __init__(self, job):
713 self.job = job
714 self.leftover = ""
715 self.last_line = ""
716 self.logs = {}
717
718
719 def _process_log_dict(self, log_dict):
720 log_list = log_dict.pop("logs", [])
721 for key in sorted(log_dict.iterkeys()):
722 log_list += self._process_log_dict(log_dict.pop(key))
723 return log_list
724
725
726 def _process_logs(self):
727 """Go through the accumulated logs in self.log and print them
728 out to stdout and the status log. Note that this processes
729 logs in an ordering where:
730
731 1) logs to different tags are never interleaved
732 2) logs to x.y come before logs to x.y.z for all z
733 3) logs to x.y come before x.z whenever y < z
734
735 Note that this will in general not be the same as the
736 chronological ordering of the logs. However, if a chronological
737 ordering is desired that one can be reconstructed from the
738 status log by looking at timestamp lines."""
739 log_list = self._process_log_dict(self.logs)
740 for line in log_list:
741 self.job._record_prerendered(line + '\n')
742 if log_list:
743 self.last_line = log_list[-1]
744
745
746 def _process_quoted_line(self, tag, line):
747 """Process a line quoted with an AUTOTEST_STATUS flag. If the
748 tag is blank then we want to push out all the data we've been
749 building up in self.logs, and then the newest line. If the
750 tag is not blank, then push the line into the logs for handling
751 later."""
752 print line
753 if tag == "":
754 self._process_logs()
755 self.job._record_prerendered(line + '\n')
756 self.last_line = line
757 else:
758 tag_parts = [int(x) for x in tag.split(".")]
759 log_dict = self.logs
760 for part in tag_parts:
761 log_dict = log_dict.setdefault(part, {})
762 log_list = log_dict.setdefault("logs", [])
763 log_list.append(line)
764
765
766 def _process_line(self, line):
767 """Write out a line of data to the appropriate stream. Status
768 lines sent by autotest will be prepended with
769 "AUTOTEST_STATUS", and all other lines are ssh error
770 messages."""
771 match = self.parser.search(line)
772 if match:
773 tag, line = match.groups()
774 self._process_quoted_line(tag, line)
775 else:
776 print line
777
jadmanski4aeefe12008-06-20 20:04:25 +0000778
779 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000780 # use the indentation of whatever the last log line was
781 indent = self.extract_indent.match(last_line).group(1)
782 # if the last line starts a new group, add an extra indent
783 if last_line.lstrip('\t').startswith("START\t"):
784 indent += '\t'
785 return [self.job._render_record("WARN", None, None, msg,
786 timestamp, indent).rstrip('\n')
787 for timestamp, msg in warnings]
788
789
790 def _process_warnings(self, last_line, log_dict, warnings):
791 if log_dict.keys() in ([], ["logs"]):
792 # there are no sub-jobs, just append the warnings here
793 warnings = self._format_warnings(last_line, warnings)
794 log_list = log_dict.setdefault("logs", [])
795 log_list += warnings
796 for warning in warnings:
797 sys.stdout.write(warning + '\n')
798 else:
799 # there are sub-jobs, so put the warnings in there
800 log_list = log_dict.get("logs", [])
801 if log_list:
802 last_line = log_list[-1]
803 for key in sorted(log_dict.iterkeys()):
804 if key != "logs":
805 self._process_warnings(last_line,
806 log_dict[key],
807 warnings)
808
809
810 def write(self, data):
811 # first check for any new console warnings
812 warnings = self.job._read_warnings()
813 self._process_warnings(self.last_line, self.logs, warnings)
814 # now process the newest data written out
815 data = self.leftover + data
816 lines = data.split("\n")
817 # process every line but the last one
818 for line in lines[:-1]:
819 self._process_line(line)
820 # save the last line for later processing
821 # since we may not have the whole line yet
822 self.leftover = lines[-1]
823
824
825 def flush(self):
826 sys.stdout.flush()
827
828
829 def close(self):
830 if self.leftover:
831 self._process_line(self.leftover)
832 self._process_logs()
833 self.flush()
834
835
mblighcaa62c22008-04-07 21:51:17 +0000836# site_server_job.py may be non-existant or empty, make sure that an
837# appropriate site_server_job class is created nevertheless
838try:
jadmanski0afbb632008-06-06 21:10:57 +0000839 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000840except ImportError:
jadmanski10646442008-08-13 14:05:21 +0000841 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +0000842 pass
843
jadmanski10646442008-08-13 14:05:21 +0000844class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +0000845 pass