blob: fed468ffb56abfe75a52eeeb0bde670cf3915f53 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
jadmanski10646442008-08-13 14:05:21 +000014import os, sys, re, time, select, subprocess, traceback
15
16from autotest_lib.client.bin import fd_stack
17from autotest_lib.client.common_lib import error, logging
18from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.client.common_lib import utils, packages
21
22
23# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
27 script_file = os.path.join(server_dir, "control_segments", name)
28 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
34preamble = """\
35import os, sys
36
37from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
42from autotest_lib.server.utils import parse_machine
43from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
45
46autotest.Autotest.job = job
47hosts.SSHHost.job = job
48barrier = barrier.barrier
49if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
51"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
57 hostname, user, password, port = parse_machine(machine,
58 ssh_user, ssh_port, ssh_pass)
59
60 host = hosts.SSHHost(hostname, user, port, password=password)
61 at.run(control, host=host)
62
63job.parallel_simple(run_client, machines)
64"""
65
66crashdumps = """
67def crashdumps(machine):
68 hostname, user, password, port = parse_machine(machine,
69 ssh_user, ssh_port, ssh_pass)
70
71 host = hosts.SSHHost(hostname, user, port, initialize=False, \
72 password=password)
73 host.get_crashdumps(test_start_time)
74
75job.parallel_simple(crashdumps, machines, log=False)
76"""
77
78reboot_segment="""\
79def reboot(machine):
80 hostname, user, password, port = parse_machine(machine,
81 ssh_user, ssh_port, ssh_pass)
82
83 host = hosts.SSHHost(hostname, user, port, initialize=False, \
84 password=password)
85 host.reboot()
86
87job.parallel_simple(reboot, machines, log=False)
88"""
89
90install="""\
91def install(machine):
92 hostname, user, password, port = parse_machine(machine,
93 ssh_user, ssh_port, ssh_pass)
94
95 host = hosts.SSHHost(hostname, user, port, initialize=False, \
96 password=password)
97 host.machine_install()
98
99job.parallel_simple(install, machines, log=False)
100"""
101
102# load up the verifier control segment, with an optional site-specific hook
103verify = load_control_segment("site_verify")
104verify += load_control_segment("verify")
105
106# load up the repair control segment, with an optional site-specific hook
107repair = load_control_segment("site_repair")
108repair += load_control_segment("repair")
109
110
111# load up site-specific code for generating site-specific job data
112try:
113 import site_job
114 get_site_job_data = site_job.get_site_job_data
115 del site_job
116except ImportError:
117 # by default provide a stub that generates no site data
118 def get_site_job_data(job):
119 return {}
120
121
122class base_server_job(object):
123 """The actual job against which we do everything.
124
125 Properties:
126 autodir
127 The top level autotest directory (/usr/local/autotest).
128 serverdir
129 <autodir>/server/
130 clientdir
131 <autodir>/client/
132 conmuxdir
133 <autodir>/conmux/
134 testdir
135 <autodir>/server/tests/
136 site_testdir
137 <autodir>/server/site_tests/
138 control
139 the control file for this job
140 """
141
142 STATUS_VERSION = 1
143
144
145 def __init__(self, control, args, resultdir, label, user, machines,
146 client=False, parse_job='',
147 ssh_user='root', ssh_port=22, ssh_pass=''):
148 """
149 control
150 The control file (pathname of)
151 args
152 args to pass to the control file
153 resultdir
154 where to throw the results
155 label
156 label for the job
157 user
158 Username for the job (email address)
159 client
160 True if a client-side control file
161 """
162 path = os.path.dirname(__file__)
163 self.autodir = os.path.abspath(os.path.join(path, '..'))
164 self.serverdir = os.path.join(self.autodir, 'server')
165 self.testdir = os.path.join(self.serverdir, 'tests')
166 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
167 self.tmpdir = os.path.join(self.serverdir, 'tmp')
168 self.conmuxdir = os.path.join(self.autodir, 'conmux')
169 self.clientdir = os.path.join(self.autodir, 'client')
170 self.toolsdir = os.path.join(self.autodir, 'client/tools')
171 if control:
172 self.control = open(control, 'r').read()
173 self.control = re.sub('\r', '', self.control)
174 else:
175 self.control = None
176 self.resultdir = resultdir
177 if not os.path.exists(resultdir):
178 os.mkdir(resultdir)
179 self.debugdir = os.path.join(resultdir, 'debug')
180 if not os.path.exists(self.debugdir):
181 os.mkdir(self.debugdir)
182 self.status = os.path.join(resultdir, 'status')
183 self.label = label
184 self.user = user
185 self.args = args
186 self.machines = machines
187 self.client = client
188 self.record_prefix = ''
189 self.warning_loggers = set()
190 self.ssh_user = ssh_user
191 self.ssh_port = ssh_port
192 self.ssh_pass = ssh_pass
193
194 self.stdout = fd_stack.fd_stack(1, sys.stdout)
195 self.stderr = fd_stack.fd_stack(2, sys.stderr)
196
197 if os.path.exists(self.status):
198 os.unlink(self.status)
199 job_data = {'label' : label, 'user' : user,
200 'hostname' : ','.join(machines),
201 'status_version' : str(self.STATUS_VERSION)}
202 job_data.update(get_site_job_data(self))
203 utils.write_keyval(self.resultdir, job_data)
204
205 self.parse_job = parse_job
206 if self.parse_job and len(machines) == 1:
207 self.using_parser = True
208 self.init_parser(resultdir)
209 else:
210 self.using_parser = False
211 self.pkgmgr = packages.PackageManager(
212 self.autodir, run_function_dargs={'timeout':600})
213 self.pkgdir = os.path.join(self.autodir, 'packages')
214
215
216 def init_parser(self, resultdir):
217 """Start the continuous parsing of resultdir. This sets up
218 the database connection and inserts the basic job object into
219 the database if necessary."""
220 # redirect parser debugging to .parse.log
221 parse_log = os.path.join(resultdir, '.parse.log')
222 parse_log = open(parse_log, 'w', 0)
223 tko_utils.redirect_parser_debugging(parse_log)
224 # create a job model object and set up the db
225 self.results_db = tko_db.db(autocommit=True)
226 self.parser = status_lib.parser(self.STATUS_VERSION)
227 self.job_model = self.parser.make_job(resultdir)
228 self.parser.start(self.job_model)
229 # check if a job already exists in the db and insert it if
230 # it does not
231 job_idx = self.results_db.find_job(self.parse_job)
232 if job_idx is None:
233 self.results_db.insert_job(self.parse_job,
234 self.job_model)
235 else:
236 machine_idx = self.results_db.lookup_machine(
237 self.job_model.machine)
238 self.job_model.index = job_idx
239 self.job_model.machine_idx = machine_idx
240
241
242 def cleanup_parser(self):
243 """This should be called after the server job is finished
244 to carry out any remaining cleanup (e.g. flushing any
245 remaining test results to the results db)"""
246 if not self.using_parser:
247 return
248 final_tests = self.parser.end()
249 for test in final_tests:
250 self.__insert_test(test)
251 self.using_parser = False
252
253
254 def verify(self):
255 if not self.machines:
256 raise error.AutoservError(
257 'No machines specified to verify')
258 try:
259 namespace = {'machines' : self.machines, 'job' : self, \
260 'ssh_user' : self.ssh_user, \
261 'ssh_port' : self.ssh_port, \
262 'ssh_pass' : self.ssh_pass}
263 self._execute_code(preamble + verify, namespace, namespace)
264 except Exception, e:
265 msg = ('Verify failed\n' + str(e) + '\n'
266 + traceback.format_exc())
267 self.record('ABORT', None, None, msg)
268 raise
269
270
271 def repair(self, host_protection):
272 if not self.machines:
273 raise error.AutoservError('No machines specified to repair')
274 namespace = {'machines': self.machines, 'job': self,
275 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
276 'ssh_pass': self.ssh_pass,
277 'protection_level': host_protection}
278 # no matter what happens during repair, go on to try to reverify
279 try:
280 self._execute_code(preamble + repair, namespace, namespace)
281 except Exception, exc:
282 print 'Exception occured during repair'
283 traceback.print_exc()
284 self.verify()
285
286
287 def precheck(self):
288 """
289 perform any additional checks in derived classes.
290 """
291 pass
292
293
294 def enable_external_logging(self):
295 """Start or restart external logging mechanism.
296 """
297 pass
298
299
300 def disable_external_logging(self):
301 """ Pause or stop external logging mechanism.
302 """
303 pass
304
305
306 def use_external_logging(self):
307 """Return True if external logging should be used.
308 """
309 return False
310
311
312 def parallel_simple(self, function, machines, log=True, timeout=None):
313 """Run 'function' using parallel_simple, with an extra
314 wrapper to handle the necessary setup for continuous parsing,
315 if possible. If continuous parsing is already properly
316 initialized then this should just work."""
317 is_forking = not (len(machines) == 1 and
318 self.machines == machines)
319 if self.parse_job and is_forking:
320 def wrapper(machine):
321 self.parse_job += "/" + machine
322 self.using_parser = True
323 self.machines = [machine]
324 self.resultdir = os.path.join(self.resultdir,
325 machine)
326 self.init_parser(self.resultdir)
327 result = function(machine)
328 self.cleanup_parser()
329 return result
330 elif len(machines) > 1:
331 def wrapper(machine):
332 self.resultdir = os.path.join(self.resultdir, machine)
333 result = function(machine)
334 return result
335 else:
336 wrapper = function
337 subcommand.parallel_simple(wrapper, machines, log, timeout)
338
339
340 def run(self, reboot = False, install_before = False,
341 install_after = False, collect_crashdumps = True,
342 namespace = {}):
343 # use a copy so changes don't affect the original dictionary
344 namespace = namespace.copy()
345 machines = self.machines
346
347 self.aborted = False
348 namespace['machines'] = machines
349 namespace['args'] = self.args
350 namespace['job'] = self
351 namespace['ssh_user'] = self.ssh_user
352 namespace['ssh_port'] = self.ssh_port
353 namespace['ssh_pass'] = self.ssh_pass
354 test_start_time = int(time.time())
355
356 os.chdir(self.resultdir)
357
358 self.enable_external_logging()
359 status_log = os.path.join(self.resultdir, 'status.log')
360 try:
361 if install_before and machines:
362 self._execute_code(preamble + install, namespace, namespace)
363 if self.client:
364 namespace['control'] = self.control
365 open('control', 'w').write(self.control)
366 open('control.srv', 'w').write(client_wrapper)
367 server_control = client_wrapper
368 else:
369 open('control.srv', 'w').write(self.control)
370 server_control = self.control
371 self._execute_code(preamble + server_control, namespace,
372 namespace)
373
374 finally:
375 if machines and collect_crashdumps:
376 namespace['test_start_time'] = test_start_time
377 self._execute_code(preamble + crashdumps, namespace,
378 namespace)
379 self.disable_external_logging()
380 if reboot and machines:
381 self._execute_code(preamble + reboot_segment,namespace,
382 namespace)
383 if install_after and machines:
384 self._execute_code(preamble + install, namespace, namespace)
385
386
387 def run_test(self, url, *args, **dargs):
388 """Summon a test object and run it.
389
390 tag
391 tag to add to testname
392 url
393 url of the test to run
394 """
395
396 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
397 tag = None
398 subdir = testname
399
400 tag = dargs.pop('tag', None)
401 if tag:
402 subdir += '.' + tag
403
404 outputdir = os.path.join(self.resultdir, subdir)
405 if os.path.exists(outputdir):
406 msg = ("%s already exists, test <%s> may have"
407 " already run with tag <%s>"
408 % (outputdir, testname, tag) )
409 raise error.TestError(msg)
410 os.mkdir(outputdir)
411
412 def group_func():
413 try:
414 test.runtest(self, url, tag, args, dargs)
415 except error.TestBaseException, e:
416 self.record(e.exit_status, subdir, testname, str(e))
417 raise
418 except Exception, e:
419 info = str(e) + "\n" + traceback.format_exc()
420 self.record('FAIL', subdir, testname, info)
421 raise
422 else:
423 self.record('GOOD', subdir, testname,
424 'completed successfully')
425 self._run_group(testname, subdir, group_func)
426
427
428 def _run_group(self, name, subdir, function, *args, **dargs):
429 """\
430 Underlying method for running something inside of a group.
431 """
432 result = None
433 old_record_prefix = self.record_prefix
434 try:
435 self.record('START', subdir, name)
436 self.record_prefix += '\t'
437 try:
438 result = function(*args, **dargs)
439 finally:
440 self.record_prefix = old_record_prefix
441 except error.TestBaseException, e:
442 self.record("END %s" % e.exit_status, subdir, name, str(e))
443 except Exception, e:
444 err_msg = str(e) + '\n'
445 err_msg += traceback.format_exc()
446 self.record('END ABORT', subdir, name, err_msg)
447 raise error.JobError(name + ' failed\n' + traceback.format_exc())
448 else:
449 self.record('END GOOD', subdir, name)
450
451 return result
452
453
454 def run_group(self, function, *args, **dargs):
455 """\
456 function:
457 subroutine to run
458 *args:
459 arguments for the function
460 """
461
462 name = function.__name__
463
464 # Allow the tag for the group to be specified.
465 tag = dargs.pop('tag', None)
466 if tag:
467 name = tag
468
469 return self._run_group(name, None, function, *args, **dargs)
470
471
472 def run_reboot(self, reboot_func, get_kernel_func):
473 """\
474 A specialization of run_group meant specifically for handling
475 a reboot. Includes support for capturing the kernel version
476 after the reboot.
477
478 reboot_func: a function that carries out the reboot
479
480 get_kernel_func: a function that returns a string
481 representing the kernel version.
482 """
483
484 old_record_prefix = self.record_prefix
485 try:
486 self.record('START', None, 'reboot')
487 self.record_prefix += '\t'
488 reboot_func()
489 except Exception, e:
490 self.record_prefix = old_record_prefix
491 err_msg = str(e) + '\n' + traceback.format_exc()
492 self.record('END FAIL', None, 'reboot', err_msg)
493 else:
494 kernel = get_kernel_func()
495 self.record_prefix = old_record_prefix
496 self.record('END GOOD', None, 'reboot',
497 optional_fields={"kernel": kernel})
498
499
500 def record(self, status_code, subdir, operation, status='',
501 optional_fields=None):
502 """
503 Record job-level status
504
505 The intent is to make this file both machine parseable and
506 human readable. That involves a little more complexity, but
507 really isn't all that bad ;-)
508
509 Format is <status code>\t<subdir>\t<operation>\t<status>
510
511 status code: see common_lib.logging.is_valid_status()
512 for valid status definition
513
514 subdir: MUST be a relevant subdirectory in the results,
515 or None, which will be represented as '----'
516
517 operation: description of what you ran (e.g. "dbench", or
518 "mkfs -t foobar /dev/sda9")
519
520 status: error message or "completed sucessfully"
521
522 ------------------------------------------------------------
523
524 Initial tabs indicate indent levels for grouping, and is
525 governed by self.record_prefix
526
527 multiline messages have secondary lines prefaced by a double
528 space (' ')
529
530 Executing this method will trigger the logging of all new
531 warnings to date from the various console loggers.
532 """
533 # poll all our warning loggers for new warnings
534 warnings = self._read_warnings()
535 for timestamp, msg in warnings:
536 self._record("WARN", None, None, msg, timestamp)
537
538 # write out the actual status log line
539 self._record(status_code, subdir, operation, status,
540 optional_fields=optional_fields)
541
542
543 def _read_warnings(self):
544 warnings = []
545 while True:
546 # pull in a line of output from every logger that has
547 # output ready to be read
548 loggers, _, _ = select.select(self.warning_loggers,
549 [], [], 0)
550 closed_loggers = set()
551 for logger in loggers:
552 line = logger.readline()
553 # record any broken pipes (aka line == empty)
554 if len(line) == 0:
555 closed_loggers.add(logger)
556 continue
557 timestamp, msg = line.split('\t', 1)
558 warnings.append((int(timestamp), msg.strip()))
559
560 # stop listening to loggers that are closed
561 self.warning_loggers -= closed_loggers
562
563 # stop if none of the loggers have any output left
564 if not loggers:
565 break
566
567 # sort into timestamp order
568 warnings.sort()
569 return warnings
570
571
572 def _render_record(self, status_code, subdir, operation, status='',
573 epoch_time=None, record_prefix=None,
574 optional_fields=None):
575 """
576 Internal Function to generate a record to be written into a
577 status log. For use by server_job.* classes only.
578 """
579 if subdir:
580 if re.match(r'[\n\t]', subdir):
581 raise ValueError(
582 'Invalid character in subdir string')
583 substr = subdir
584 else:
585 substr = '----'
586
587 if not logging.is_valid_status(status_code):
588 raise ValueError('Invalid status code supplied: %s' %
589 status_code)
590 if not operation:
591 operation = '----'
592 if re.match(r'[\n\t]', operation):
593 raise ValueError(
594 'Invalid character in operation string')
595 operation = operation.rstrip()
596 status = status.rstrip()
597 status = re.sub(r"\t", " ", status)
598 # Ensure any continuation lines are marked so we can
599 # detect them in the status file to ensure it is parsable.
600 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
601
602 if not optional_fields:
603 optional_fields = {}
604
605 # Generate timestamps for inclusion in the logs
606 if epoch_time is None:
607 epoch_time = int(time.time())
608 local_time = time.localtime(epoch_time)
609 optional_fields["timestamp"] = str(epoch_time)
610 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
611 local_time)
612
613 fields = [status_code, substr, operation]
614 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
615 fields.append(status)
616
617 if record_prefix is None:
618 record_prefix = self.record_prefix
619
620 msg = '\t'.join(str(x) for x in fields)
621
622 return record_prefix + msg + '\n'
623
624
625 def _record_prerendered(self, msg):
626 """
627 Record a pre-rendered msg into the status logs. The only
628 change this makes to the message is to add on the local
629 indentation. Should not be called outside of server_job.*
630 classes. Unlike _record, this does not write the message
631 to standard output.
632 """
633 lines = []
634 status_file = os.path.join(self.resultdir, 'status.log')
635 status_log = open(status_file, 'a')
636 for line in msg.splitlines():
637 line = self.record_prefix + line + '\n'
638 lines.append(line)
639 status_log.write(line)
640 status_log.close()
641 self.__parse_status(lines)
642
643
644 def _execute_code(self, code, global_scope, local_scope):
645 exec(code, global_scope, local_scope)
646
647
648 def _record(self, status_code, subdir, operation, status='',
649 epoch_time=None, optional_fields=None):
650 """
651 Actual function for recording a single line into the status
652 logs. Should never be called directly, only by job.record as
653 this would bypass the console monitor logging.
654 """
655
656 msg = self._render_record(status_code, subdir, operation,
657 status, epoch_time,
658 optional_fields=optional_fields)
659
660
661 status_file = os.path.join(self.resultdir, 'status.log')
662 sys.stdout.write(msg)
663 open(status_file, "a").write(msg)
664 if subdir:
665 test_dir = os.path.join(self.resultdir, subdir)
666 status_file = os.path.join(test_dir, 'status')
667 open(status_file, "a").write(msg)
668 self.__parse_status(msg.splitlines())
669
670
671 def __parse_status(self, new_lines):
672 if not self.using_parser:
673 return
674 new_tests = self.parser.process_lines(new_lines)
675 for test in new_tests:
676 self.__insert_test(test)
677
678
679 def __insert_test(self, test):
680 """ An internal method to insert a new test result into the
681 database. This method will not raise an exception, even if an
682 error occurs during the insert, to avoid failing a test
683 simply because of unexpected database issues."""
684 try:
685 self.results_db.insert_test(self.job_model, test)
686 except Exception:
687 msg = ("WARNING: An unexpected error occured while "
688 "inserting test results into the database. "
689 "Ignoring error.\n" + traceback.format_exc())
690 print >> sys.stderr, msg
691
mblighcaa62c22008-04-07 21:51:17 +0000692
mbligh57e78662008-06-17 19:53:49 +0000693# a file-like object for catching stderr from an autotest client and
694# extracting status logs from it
695class client_logger(object):
696 """Partial file object to write to both stdout and
697 the status log file. We only implement those methods
698 utils.run() actually calls.
699 """
700 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
701 extract_indent = re.compile(r"^(\t*).*$")
702
703 def __init__(self, job):
704 self.job = job
705 self.leftover = ""
706 self.last_line = ""
707 self.logs = {}
708
709
710 def _process_log_dict(self, log_dict):
711 log_list = log_dict.pop("logs", [])
712 for key in sorted(log_dict.iterkeys()):
713 log_list += self._process_log_dict(log_dict.pop(key))
714 return log_list
715
716
717 def _process_logs(self):
718 """Go through the accumulated logs in self.log and print them
719 out to stdout and the status log. Note that this processes
720 logs in an ordering where:
721
722 1) logs to different tags are never interleaved
723 2) logs to x.y come before logs to x.y.z for all z
724 3) logs to x.y come before x.z whenever y < z
725
726 Note that this will in general not be the same as the
727 chronological ordering of the logs. However, if a chronological
728 ordering is desired that one can be reconstructed from the
729 status log by looking at timestamp lines."""
730 log_list = self._process_log_dict(self.logs)
731 for line in log_list:
732 self.job._record_prerendered(line + '\n')
733 if log_list:
734 self.last_line = log_list[-1]
735
736
737 def _process_quoted_line(self, tag, line):
738 """Process a line quoted with an AUTOTEST_STATUS flag. If the
739 tag is blank then we want to push out all the data we've been
740 building up in self.logs, and then the newest line. If the
741 tag is not blank, then push the line into the logs for handling
742 later."""
743 print line
744 if tag == "":
745 self._process_logs()
746 self.job._record_prerendered(line + '\n')
747 self.last_line = line
748 else:
749 tag_parts = [int(x) for x in tag.split(".")]
750 log_dict = self.logs
751 for part in tag_parts:
752 log_dict = log_dict.setdefault(part, {})
753 log_list = log_dict.setdefault("logs", [])
754 log_list.append(line)
755
756
757 def _process_line(self, line):
758 """Write out a line of data to the appropriate stream. Status
759 lines sent by autotest will be prepended with
760 "AUTOTEST_STATUS", and all other lines are ssh error
761 messages."""
762 match = self.parser.search(line)
763 if match:
764 tag, line = match.groups()
765 self._process_quoted_line(tag, line)
766 else:
767 print line
768
jadmanski4aeefe12008-06-20 20:04:25 +0000769
770 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000771 # use the indentation of whatever the last log line was
772 indent = self.extract_indent.match(last_line).group(1)
773 # if the last line starts a new group, add an extra indent
774 if last_line.lstrip('\t').startswith("START\t"):
775 indent += '\t'
776 return [self.job._render_record("WARN", None, None, msg,
777 timestamp, indent).rstrip('\n')
778 for timestamp, msg in warnings]
779
780
781 def _process_warnings(self, last_line, log_dict, warnings):
782 if log_dict.keys() in ([], ["logs"]):
783 # there are no sub-jobs, just append the warnings here
784 warnings = self._format_warnings(last_line, warnings)
785 log_list = log_dict.setdefault("logs", [])
786 log_list += warnings
787 for warning in warnings:
788 sys.stdout.write(warning + '\n')
789 else:
790 # there are sub-jobs, so put the warnings in there
791 log_list = log_dict.get("logs", [])
792 if log_list:
793 last_line = log_list[-1]
794 for key in sorted(log_dict.iterkeys()):
795 if key != "logs":
796 self._process_warnings(last_line,
797 log_dict[key],
798 warnings)
799
800
801 def write(self, data):
802 # first check for any new console warnings
803 warnings = self.job._read_warnings()
804 self._process_warnings(self.last_line, self.logs, warnings)
805 # now process the newest data written out
806 data = self.leftover + data
807 lines = data.split("\n")
808 # process every line but the last one
809 for line in lines[:-1]:
810 self._process_line(line)
811 # save the last line for later processing
812 # since we may not have the whole line yet
813 self.leftover = lines[-1]
814
815
816 def flush(self):
817 sys.stdout.flush()
818
819
820 def close(self):
821 if self.leftover:
822 self._process_line(self.leftover)
823 self._process_logs()
824 self.flush()
825
826
mblighcaa62c22008-04-07 21:51:17 +0000827# site_server_job.py may be non-existant or empty, make sure that an
828# appropriate site_server_job class is created nevertheless
829try:
jadmanski0afbb632008-06-06 21:10:57 +0000830 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000831except ImportError:
jadmanski10646442008-08-13 14:05:21 +0000832 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +0000833 pass
834
jadmanski10646442008-08-13 14:05:21 +0000835class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +0000836 pass