blob: e58ee3c7e2fd369aa9f7b479003f8204b9047a49 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
jadmanski10646442008-08-13 14:05:21 +000014import os, sys, re, time, select, subprocess, traceback
15
16from autotest_lib.client.bin import fd_stack
17from autotest_lib.client.common_lib import error, logging
18from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.client.common_lib import utils, packages
21
22
23# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
27 script_file = os.path.join(server_dir, "control_segments", name)
28 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
34preamble = """\
35import os, sys
36
37from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
42from autotest_lib.server.utils import parse_machine
43from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
45
46autotest.Autotest.job = job
jadmanski1c5e3a12008-08-15 23:08:20 +000047hosts.Host.job = job
jadmanski10646442008-08-13 14:05:21 +000048barrier = barrier.barrier
49if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
51"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000057 hostname, user, passwd, port = parse_machine(
58 machine, ssh_user, ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000059
jadmanski1c5e3a12008-08-15 23:08:20 +000060 host = hosts.create_host(hostname, user=user, port=port, password=passwd)
61 at.run(control, host=host)
jadmanski10646442008-08-13 14:05:21 +000062
63job.parallel_simple(run_client, machines)
64"""
65
66crashdumps = """
67def crashdumps(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000068 hostname, user, passwd, port = parse_machine(machine, ssh_user,
69 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000070
jadmanski1c5e3a12008-08-15 23:08:20 +000071 host = hosts.create_host(hostname, user=user, port=port,
72 initialize=False, password=passwd)
73 host.get_crashdumps(test_start_time)
jadmanski10646442008-08-13 14:05:21 +000074
75job.parallel_simple(crashdumps, machines, log=False)
76"""
77
78reboot_segment="""\
79def reboot(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000080 hostname, user, passwd, port = parse_machine(machine, ssh_user,
81 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000082
jadmanski8e72aaf2008-08-20 19:22:29 +000083 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000084 initialize=False, password=passwd)
85 host.reboot()
jadmanski10646442008-08-13 14:05:21 +000086
87job.parallel_simple(reboot, machines, log=False)
88"""
89
90install="""\
91def install(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000092 hostname, user, passwd, port = parse_machine(machine, ssh_user,
93 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000094
jadmanski8e72aaf2008-08-20 19:22:29 +000095 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000096 initialize=False, password=passwd)
97 host.machine_install()
jadmanski10646442008-08-13 14:05:21 +000098
99job.parallel_simple(install, machines, log=False)
100"""
101
102# load up the verifier control segment, with an optional site-specific hook
103verify = load_control_segment("site_verify")
104verify += load_control_segment("verify")
105
106# load up the repair control segment, with an optional site-specific hook
107repair = load_control_segment("site_repair")
108repair += load_control_segment("repair")
109
110
111# load up site-specific code for generating site-specific job data
112try:
113 import site_job
114 get_site_job_data = site_job.get_site_job_data
115 del site_job
116except ImportError:
117 # by default provide a stub that generates no site data
118 def get_site_job_data(job):
119 return {}
120
121
122class base_server_job(object):
123 """The actual job against which we do everything.
124
125 Properties:
126 autodir
127 The top level autotest directory (/usr/local/autotest).
128 serverdir
129 <autodir>/server/
130 clientdir
131 <autodir>/client/
132 conmuxdir
133 <autodir>/conmux/
134 testdir
135 <autodir>/server/tests/
136 site_testdir
137 <autodir>/server/site_tests/
138 control
139 the control file for this job
140 """
141
142 STATUS_VERSION = 1
143
144
145 def __init__(self, control, args, resultdir, label, user, machines,
146 client=False, parse_job='',
147 ssh_user='root', ssh_port=22, ssh_pass=''):
148 """
149 control
150 The control file (pathname of)
151 args
152 args to pass to the control file
153 resultdir
154 where to throw the results
155 label
156 label for the job
157 user
158 Username for the job (email address)
159 client
160 True if a client-side control file
161 """
162 path = os.path.dirname(__file__)
163 self.autodir = os.path.abspath(os.path.join(path, '..'))
164 self.serverdir = os.path.join(self.autodir, 'server')
165 self.testdir = os.path.join(self.serverdir, 'tests')
166 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
167 self.tmpdir = os.path.join(self.serverdir, 'tmp')
168 self.conmuxdir = os.path.join(self.autodir, 'conmux')
169 self.clientdir = os.path.join(self.autodir, 'client')
170 self.toolsdir = os.path.join(self.autodir, 'client/tools')
171 if control:
172 self.control = open(control, 'r').read()
173 self.control = re.sub('\r', '', self.control)
174 else:
175 self.control = None
176 self.resultdir = resultdir
177 if not os.path.exists(resultdir):
178 os.mkdir(resultdir)
179 self.debugdir = os.path.join(resultdir, 'debug')
180 if not os.path.exists(self.debugdir):
181 os.mkdir(self.debugdir)
182 self.status = os.path.join(resultdir, 'status')
183 self.label = label
184 self.user = user
185 self.args = args
186 self.machines = machines
187 self.client = client
188 self.record_prefix = ''
189 self.warning_loggers = set()
190 self.ssh_user = ssh_user
191 self.ssh_port = ssh_port
192 self.ssh_pass = ssh_pass
193
194 self.stdout = fd_stack.fd_stack(1, sys.stdout)
195 self.stderr = fd_stack.fd_stack(2, sys.stderr)
196
197 if os.path.exists(self.status):
198 os.unlink(self.status)
199 job_data = {'label' : label, 'user' : user,
200 'hostname' : ','.join(machines),
201 'status_version' : str(self.STATUS_VERSION)}
202 job_data.update(get_site_job_data(self))
203 utils.write_keyval(self.resultdir, job_data)
204
205 self.parse_job = parse_job
206 if self.parse_job and len(machines) == 1:
207 self.using_parser = True
208 self.init_parser(resultdir)
209 else:
210 self.using_parser = False
211 self.pkgmgr = packages.PackageManager(
212 self.autodir, run_function_dargs={'timeout':600})
213 self.pkgdir = os.path.join(self.autodir, 'packages')
214
215
216 def init_parser(self, resultdir):
217 """Start the continuous parsing of resultdir. This sets up
218 the database connection and inserts the basic job object into
219 the database if necessary."""
220 # redirect parser debugging to .parse.log
221 parse_log = os.path.join(resultdir, '.parse.log')
222 parse_log = open(parse_log, 'w', 0)
223 tko_utils.redirect_parser_debugging(parse_log)
224 # create a job model object and set up the db
225 self.results_db = tko_db.db(autocommit=True)
226 self.parser = status_lib.parser(self.STATUS_VERSION)
227 self.job_model = self.parser.make_job(resultdir)
228 self.parser.start(self.job_model)
229 # check if a job already exists in the db and insert it if
230 # it does not
231 job_idx = self.results_db.find_job(self.parse_job)
232 if job_idx is None:
233 self.results_db.insert_job(self.parse_job,
234 self.job_model)
235 else:
236 machine_idx = self.results_db.lookup_machine(
237 self.job_model.machine)
238 self.job_model.index = job_idx
239 self.job_model.machine_idx = machine_idx
240
241
242 def cleanup_parser(self):
243 """This should be called after the server job is finished
244 to carry out any remaining cleanup (e.g. flushing any
245 remaining test results to the results db)"""
246 if not self.using_parser:
247 return
248 final_tests = self.parser.end()
249 for test in final_tests:
250 self.__insert_test(test)
251 self.using_parser = False
252
253
254 def verify(self):
255 if not self.machines:
256 raise error.AutoservError(
257 'No machines specified to verify')
258 try:
259 namespace = {'machines' : self.machines, 'job' : self, \
260 'ssh_user' : self.ssh_user, \
261 'ssh_port' : self.ssh_port, \
262 'ssh_pass' : self.ssh_pass}
263 self._execute_code(preamble + verify, namespace, namespace)
264 except Exception, e:
265 msg = ('Verify failed\n' + str(e) + '\n'
266 + traceback.format_exc())
267 self.record('ABORT', None, None, msg)
268 raise
269
270
271 def repair(self, host_protection):
272 if not self.machines:
273 raise error.AutoservError('No machines specified to repair')
274 namespace = {'machines': self.machines, 'job': self,
275 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
276 'ssh_pass': self.ssh_pass,
277 'protection_level': host_protection}
278 # no matter what happens during repair, go on to try to reverify
279 try:
280 self._execute_code(preamble + repair, namespace, namespace)
281 except Exception, exc:
282 print 'Exception occured during repair'
283 traceback.print_exc()
284 self.verify()
285
286
287 def precheck(self):
288 """
289 perform any additional checks in derived classes.
290 """
291 pass
292
293
294 def enable_external_logging(self):
295 """Start or restart external logging mechanism.
296 """
297 pass
298
299
300 def disable_external_logging(self):
301 """ Pause or stop external logging mechanism.
302 """
303 pass
304
305
306 def use_external_logging(self):
307 """Return True if external logging should be used.
308 """
309 return False
310
311
312 def parallel_simple(self, function, machines, log=True, timeout=None):
313 """Run 'function' using parallel_simple, with an extra
314 wrapper to handle the necessary setup for continuous parsing,
315 if possible. If continuous parsing is already properly
316 initialized then this should just work."""
317 is_forking = not (len(machines) == 1 and
318 self.machines == machines)
319 if self.parse_job and is_forking:
320 def wrapper(machine):
321 self.parse_job += "/" + machine
322 self.using_parser = True
323 self.machines = [machine]
324 self.resultdir = os.path.join(self.resultdir,
325 machine)
326 self.init_parser(self.resultdir)
327 result = function(machine)
328 self.cleanup_parser()
329 return result
330 elif len(machines) > 1:
331 def wrapper(machine):
332 self.resultdir = os.path.join(self.resultdir, machine)
333 result = function(machine)
334 return result
335 else:
336 wrapper = function
337 subcommand.parallel_simple(wrapper, machines, log, timeout)
338
339
340 def run(self, reboot = False, install_before = False,
341 install_after = False, collect_crashdumps = True,
342 namespace = {}):
343 # use a copy so changes don't affect the original dictionary
344 namespace = namespace.copy()
345 machines = self.machines
346
347 self.aborted = False
348 namespace['machines'] = machines
349 namespace['args'] = self.args
350 namespace['job'] = self
351 namespace['ssh_user'] = self.ssh_user
352 namespace['ssh_port'] = self.ssh_port
353 namespace['ssh_pass'] = self.ssh_pass
354 test_start_time = int(time.time())
355
356 os.chdir(self.resultdir)
357
358 self.enable_external_logging()
359 status_log = os.path.join(self.resultdir, 'status.log')
360 try:
361 if install_before and machines:
362 self._execute_code(preamble + install, namespace, namespace)
363 if self.client:
364 namespace['control'] = self.control
365 open('control', 'w').write(self.control)
366 open('control.srv', 'w').write(client_wrapper)
367 server_control = client_wrapper
368 else:
369 open('control.srv', 'w').write(self.control)
370 server_control = self.control
371 self._execute_code(preamble + server_control, namespace,
372 namespace)
373
374 finally:
375 if machines and collect_crashdumps:
376 namespace['test_start_time'] = test_start_time
377 self._execute_code(preamble + crashdumps, namespace,
378 namespace)
379 self.disable_external_logging()
380 if reboot and machines:
381 self._execute_code(preamble + reboot_segment,namespace,
382 namespace)
383 if install_after and machines:
384 self._execute_code(preamble + install, namespace, namespace)
385
386
387 def run_test(self, url, *args, **dargs):
388 """Summon a test object and run it.
389
390 tag
391 tag to add to testname
392 url
393 url of the test to run
394 """
395
396 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000397
398 tag = dargs.pop('tag', None)
399 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000400 testname += '.' + tag
401 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000402
403 outputdir = os.path.join(self.resultdir, subdir)
404 if os.path.exists(outputdir):
405 msg = ("%s already exists, test <%s> may have"
406 " already run with tag <%s>"
407 % (outputdir, testname, tag) )
408 raise error.TestError(msg)
409 os.mkdir(outputdir)
410
411 def group_func():
412 try:
413 test.runtest(self, url, tag, args, dargs)
414 except error.TestBaseException, e:
415 self.record(e.exit_status, subdir, testname, str(e))
416 raise
417 except Exception, e:
418 info = str(e) + "\n" + traceback.format_exc()
419 self.record('FAIL', subdir, testname, info)
420 raise
421 else:
422 self.record('GOOD', subdir, testname,
423 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000424
425 result, exc_info = self._run_group(testname, subdir, group_func)
426 if exc_info and isinstance(exc_info[1], error.TestBaseException):
427 return False
428 elif exc_info:
429 raise exc_info[0], exc_info[1], exc_info[2]
430 else:
431 return True
jadmanski10646442008-08-13 14:05:21 +0000432
433
434 def _run_group(self, name, subdir, function, *args, **dargs):
435 """\
436 Underlying method for running something inside of a group.
437 """
jadmanskide292df2008-08-26 20:51:14 +0000438 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000439 old_record_prefix = self.record_prefix
440 try:
441 self.record('START', subdir, name)
442 self.record_prefix += '\t'
443 try:
444 result = function(*args, **dargs)
445 finally:
446 self.record_prefix = old_record_prefix
447 except error.TestBaseException, e:
448 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000449 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000450 except Exception, e:
451 err_msg = str(e) + '\n'
452 err_msg += traceback.format_exc()
453 self.record('END ABORT', subdir, name, err_msg)
454 raise error.JobError(name + ' failed\n' + traceback.format_exc())
455 else:
456 self.record('END GOOD', subdir, name)
457
jadmanskide292df2008-08-26 20:51:14 +0000458 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000459
460
461 def run_group(self, function, *args, **dargs):
462 """\
463 function:
464 subroutine to run
465 *args:
466 arguments for the function
467 """
468
469 name = function.__name__
470
471 # Allow the tag for the group to be specified.
472 tag = dargs.pop('tag', None)
473 if tag:
474 name = tag
475
jadmanskide292df2008-08-26 20:51:14 +0000476 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000477
478
479 def run_reboot(self, reboot_func, get_kernel_func):
480 """\
481 A specialization of run_group meant specifically for handling
482 a reboot. Includes support for capturing the kernel version
483 after the reboot.
484
485 reboot_func: a function that carries out the reboot
486
487 get_kernel_func: a function that returns a string
488 representing the kernel version.
489 """
490
491 old_record_prefix = self.record_prefix
492 try:
493 self.record('START', None, 'reboot')
494 self.record_prefix += '\t'
495 reboot_func()
496 except Exception, e:
497 self.record_prefix = old_record_prefix
498 err_msg = str(e) + '\n' + traceback.format_exc()
499 self.record('END FAIL', None, 'reboot', err_msg)
500 else:
501 kernel = get_kernel_func()
502 self.record_prefix = old_record_prefix
503 self.record('END GOOD', None, 'reboot',
504 optional_fields={"kernel": kernel})
505
506
507 def record(self, status_code, subdir, operation, status='',
508 optional_fields=None):
509 """
510 Record job-level status
511
512 The intent is to make this file both machine parseable and
513 human readable. That involves a little more complexity, but
514 really isn't all that bad ;-)
515
516 Format is <status code>\t<subdir>\t<operation>\t<status>
517
518 status code: see common_lib.logging.is_valid_status()
519 for valid status definition
520
521 subdir: MUST be a relevant subdirectory in the results,
522 or None, which will be represented as '----'
523
524 operation: description of what you ran (e.g. "dbench", or
525 "mkfs -t foobar /dev/sda9")
526
527 status: error message or "completed sucessfully"
528
529 ------------------------------------------------------------
530
531 Initial tabs indicate indent levels for grouping, and is
532 governed by self.record_prefix
533
534 multiline messages have secondary lines prefaced by a double
535 space (' ')
536
537 Executing this method will trigger the logging of all new
538 warnings to date from the various console loggers.
539 """
540 # poll all our warning loggers for new warnings
541 warnings = self._read_warnings()
542 for timestamp, msg in warnings:
543 self._record("WARN", None, None, msg, timestamp)
544
545 # write out the actual status log line
546 self._record(status_code, subdir, operation, status,
547 optional_fields=optional_fields)
548
549
550 def _read_warnings(self):
551 warnings = []
552 while True:
553 # pull in a line of output from every logger that has
554 # output ready to be read
555 loggers, _, _ = select.select(self.warning_loggers,
556 [], [], 0)
557 closed_loggers = set()
558 for logger in loggers:
559 line = logger.readline()
560 # record any broken pipes (aka line == empty)
561 if len(line) == 0:
562 closed_loggers.add(logger)
563 continue
564 timestamp, msg = line.split('\t', 1)
565 warnings.append((int(timestamp), msg.strip()))
566
567 # stop listening to loggers that are closed
568 self.warning_loggers -= closed_loggers
569
570 # stop if none of the loggers have any output left
571 if not loggers:
572 break
573
574 # sort into timestamp order
575 warnings.sort()
576 return warnings
577
578
579 def _render_record(self, status_code, subdir, operation, status='',
580 epoch_time=None, record_prefix=None,
581 optional_fields=None):
582 """
583 Internal Function to generate a record to be written into a
584 status log. For use by server_job.* classes only.
585 """
586 if subdir:
587 if re.match(r'[\n\t]', subdir):
588 raise ValueError(
589 'Invalid character in subdir string')
590 substr = subdir
591 else:
592 substr = '----'
593
594 if not logging.is_valid_status(status_code):
595 raise ValueError('Invalid status code supplied: %s' %
596 status_code)
597 if not operation:
598 operation = '----'
599 if re.match(r'[\n\t]', operation):
600 raise ValueError(
601 'Invalid character in operation string')
602 operation = operation.rstrip()
603 status = status.rstrip()
604 status = re.sub(r"\t", " ", status)
605 # Ensure any continuation lines are marked so we can
606 # detect them in the status file to ensure it is parsable.
607 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
608
609 if not optional_fields:
610 optional_fields = {}
611
612 # Generate timestamps for inclusion in the logs
613 if epoch_time is None:
614 epoch_time = int(time.time())
615 local_time = time.localtime(epoch_time)
616 optional_fields["timestamp"] = str(epoch_time)
617 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
618 local_time)
619
620 fields = [status_code, substr, operation]
621 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
622 fields.append(status)
623
624 if record_prefix is None:
625 record_prefix = self.record_prefix
626
627 msg = '\t'.join(str(x) for x in fields)
628
629 return record_prefix + msg + '\n'
630
631
632 def _record_prerendered(self, msg):
633 """
634 Record a pre-rendered msg into the status logs. The only
635 change this makes to the message is to add on the local
636 indentation. Should not be called outside of server_job.*
637 classes. Unlike _record, this does not write the message
638 to standard output.
639 """
640 lines = []
641 status_file = os.path.join(self.resultdir, 'status.log')
642 status_log = open(status_file, 'a')
643 for line in msg.splitlines():
644 line = self.record_prefix + line + '\n'
645 lines.append(line)
646 status_log.write(line)
647 status_log.close()
648 self.__parse_status(lines)
649
650
651 def _execute_code(self, code, global_scope, local_scope):
652 exec(code, global_scope, local_scope)
653
654
655 def _record(self, status_code, subdir, operation, status='',
656 epoch_time=None, optional_fields=None):
657 """
658 Actual function for recording a single line into the status
659 logs. Should never be called directly, only by job.record as
660 this would bypass the console monitor logging.
661 """
662
663 msg = self._render_record(status_code, subdir, operation,
664 status, epoch_time,
665 optional_fields=optional_fields)
666
667
668 status_file = os.path.join(self.resultdir, 'status.log')
669 sys.stdout.write(msg)
670 open(status_file, "a").write(msg)
671 if subdir:
672 test_dir = os.path.join(self.resultdir, subdir)
673 status_file = os.path.join(test_dir, 'status')
674 open(status_file, "a").write(msg)
675 self.__parse_status(msg.splitlines())
676
677
678 def __parse_status(self, new_lines):
679 if not self.using_parser:
680 return
681 new_tests = self.parser.process_lines(new_lines)
682 for test in new_tests:
683 self.__insert_test(test)
684
685
686 def __insert_test(self, test):
687 """ An internal method to insert a new test result into the
688 database. This method will not raise an exception, even if an
689 error occurs during the insert, to avoid failing a test
690 simply because of unexpected database issues."""
691 try:
692 self.results_db.insert_test(self.job_model, test)
693 except Exception:
694 msg = ("WARNING: An unexpected error occured while "
695 "inserting test results into the database. "
696 "Ignoring error.\n" + traceback.format_exc())
697 print >> sys.stderr, msg
698
mblighcaa62c22008-04-07 21:51:17 +0000699
mbligh57e78662008-06-17 19:53:49 +0000700# a file-like object for catching stderr from an autotest client and
701# extracting status logs from it
702class client_logger(object):
703 """Partial file object to write to both stdout and
704 the status log file. We only implement those methods
705 utils.run() actually calls.
706 """
707 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
708 extract_indent = re.compile(r"^(\t*).*$")
709
710 def __init__(self, job):
711 self.job = job
712 self.leftover = ""
713 self.last_line = ""
714 self.logs = {}
715
716
717 def _process_log_dict(self, log_dict):
718 log_list = log_dict.pop("logs", [])
719 for key in sorted(log_dict.iterkeys()):
720 log_list += self._process_log_dict(log_dict.pop(key))
721 return log_list
722
723
724 def _process_logs(self):
725 """Go through the accumulated logs in self.log and print them
726 out to stdout and the status log. Note that this processes
727 logs in an ordering where:
728
729 1) logs to different tags are never interleaved
730 2) logs to x.y come before logs to x.y.z for all z
731 3) logs to x.y come before x.z whenever y < z
732
733 Note that this will in general not be the same as the
734 chronological ordering of the logs. However, if a chronological
735 ordering is desired that one can be reconstructed from the
736 status log by looking at timestamp lines."""
737 log_list = self._process_log_dict(self.logs)
738 for line in log_list:
739 self.job._record_prerendered(line + '\n')
740 if log_list:
741 self.last_line = log_list[-1]
742
743
744 def _process_quoted_line(self, tag, line):
745 """Process a line quoted with an AUTOTEST_STATUS flag. If the
746 tag is blank then we want to push out all the data we've been
747 building up in self.logs, and then the newest line. If the
748 tag is not blank, then push the line into the logs for handling
749 later."""
750 print line
751 if tag == "":
752 self._process_logs()
753 self.job._record_prerendered(line + '\n')
754 self.last_line = line
755 else:
756 tag_parts = [int(x) for x in tag.split(".")]
757 log_dict = self.logs
758 for part in tag_parts:
759 log_dict = log_dict.setdefault(part, {})
760 log_list = log_dict.setdefault("logs", [])
761 log_list.append(line)
762
763
764 def _process_line(self, line):
765 """Write out a line of data to the appropriate stream. Status
766 lines sent by autotest will be prepended with
767 "AUTOTEST_STATUS", and all other lines are ssh error
768 messages."""
769 match = self.parser.search(line)
770 if match:
771 tag, line = match.groups()
772 self._process_quoted_line(tag, line)
773 else:
774 print line
775
jadmanski4aeefe12008-06-20 20:04:25 +0000776
777 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000778 # use the indentation of whatever the last log line was
779 indent = self.extract_indent.match(last_line).group(1)
780 # if the last line starts a new group, add an extra indent
781 if last_line.lstrip('\t').startswith("START\t"):
782 indent += '\t'
783 return [self.job._render_record("WARN", None, None, msg,
784 timestamp, indent).rstrip('\n')
785 for timestamp, msg in warnings]
786
787
788 def _process_warnings(self, last_line, log_dict, warnings):
789 if log_dict.keys() in ([], ["logs"]):
790 # there are no sub-jobs, just append the warnings here
791 warnings = self._format_warnings(last_line, warnings)
792 log_list = log_dict.setdefault("logs", [])
793 log_list += warnings
794 for warning in warnings:
795 sys.stdout.write(warning + '\n')
796 else:
797 # there are sub-jobs, so put the warnings in there
798 log_list = log_dict.get("logs", [])
799 if log_list:
800 last_line = log_list[-1]
801 for key in sorted(log_dict.iterkeys()):
802 if key != "logs":
803 self._process_warnings(last_line,
804 log_dict[key],
805 warnings)
806
807
808 def write(self, data):
809 # first check for any new console warnings
810 warnings = self.job._read_warnings()
811 self._process_warnings(self.last_line, self.logs, warnings)
812 # now process the newest data written out
813 data = self.leftover + data
814 lines = data.split("\n")
815 # process every line but the last one
816 for line in lines[:-1]:
817 self._process_line(line)
818 # save the last line for later processing
819 # since we may not have the whole line yet
820 self.leftover = lines[-1]
821
822
823 def flush(self):
824 sys.stdout.flush()
825
826
827 def close(self):
828 if self.leftover:
829 self._process_line(self.leftover)
830 self._process_logs()
831 self.flush()
832
833
mblighcaa62c22008-04-07 21:51:17 +0000834# site_server_job.py may be non-existant or empty, make sure that an
835# appropriate site_server_job class is created nevertheless
836try:
jadmanski0afbb632008-06-06 21:10:57 +0000837 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000838except ImportError:
jadmanski10646442008-08-13 14:05:21 +0000839 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +0000840 pass
841
jadmanski10646442008-08-13 14:05:21 +0000842class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +0000843 pass