blob: b2733cb64ec7437200b0c5131f534223e87af74f [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
jadmanski10646442008-08-13 14:05:21 +000014import os, sys, re, time, select, subprocess, traceback
15
16from autotest_lib.client.bin import fd_stack
17from autotest_lib.client.common_lib import error, logging
18from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.client.common_lib import utils, packages
21
22
23# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
27 script_file = os.path.join(server_dir, "control_segments", name)
28 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
34preamble = """\
35import os, sys
36
37from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
42from autotest_lib.server.utils import parse_machine
43from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
45
46autotest.Autotest.job = job
jadmanski1c5e3a12008-08-15 23:08:20 +000047hosts.Host.job = job
jadmanski10646442008-08-13 14:05:21 +000048barrier = barrier.barrier
49if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
51"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000057 hostname, user, passwd, port = parse_machine(
58 machine, ssh_user, ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000059
jadmanski1c5e3a12008-08-15 23:08:20 +000060 host = hosts.create_host(hostname, user=user, port=port, password=passwd)
61 at.run(control, host=host)
jadmanski10646442008-08-13 14:05:21 +000062
63job.parallel_simple(run_client, machines)
64"""
65
66crashdumps = """
67def crashdumps(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000068 hostname, user, passwd, port = parse_machine(machine, ssh_user,
69 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000070
jadmanski1c5e3a12008-08-15 23:08:20 +000071 host = hosts.create_host(hostname, user=user, port=port,
72 initialize=False, password=passwd)
73 host.get_crashdumps(test_start_time)
jadmanski10646442008-08-13 14:05:21 +000074
75job.parallel_simple(crashdumps, machines, log=False)
76"""
77
78reboot_segment="""\
79def reboot(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000080 hostname, user, passwd, port = parse_machine(machine, ssh_user,
81 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000082
jadmanski8e72aaf2008-08-20 19:22:29 +000083 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000084 initialize=False, password=passwd)
85 host.reboot()
jadmanski10646442008-08-13 14:05:21 +000086
87job.parallel_simple(reboot, machines, log=False)
88"""
89
90install="""\
91def install(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000092 hostname, user, passwd, port = parse_machine(machine, ssh_user,
93 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000094
jadmanski8e72aaf2008-08-20 19:22:29 +000095 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000096 initialize=False, password=passwd)
97 host.machine_install()
jadmanski10646442008-08-13 14:05:21 +000098
99job.parallel_simple(install, machines, log=False)
100"""
101
102# load up the verifier control segment, with an optional site-specific hook
103verify = load_control_segment("site_verify")
104verify += load_control_segment("verify")
105
106# load up the repair control segment, with an optional site-specific hook
107repair = load_control_segment("site_repair")
108repair += load_control_segment("repair")
109
110
111# load up site-specific code for generating site-specific job data
112try:
113 import site_job
114 get_site_job_data = site_job.get_site_job_data
115 del site_job
116except ImportError:
117 # by default provide a stub that generates no site data
118 def get_site_job_data(job):
119 return {}
120
121
122class base_server_job(object):
123 """The actual job against which we do everything.
124
125 Properties:
126 autodir
127 The top level autotest directory (/usr/local/autotest).
128 serverdir
129 <autodir>/server/
130 clientdir
131 <autodir>/client/
132 conmuxdir
133 <autodir>/conmux/
134 testdir
135 <autodir>/server/tests/
136 site_testdir
137 <autodir>/server/site_tests/
138 control
139 the control file for this job
140 """
141
142 STATUS_VERSION = 1
143
144
145 def __init__(self, control, args, resultdir, label, user, machines,
146 client=False, parse_job='',
147 ssh_user='root', ssh_port=22, ssh_pass=''):
148 """
149 control
150 The control file (pathname of)
151 args
152 args to pass to the control file
153 resultdir
154 where to throw the results
155 label
156 label for the job
157 user
158 Username for the job (email address)
159 client
160 True if a client-side control file
161 """
162 path = os.path.dirname(__file__)
163 self.autodir = os.path.abspath(os.path.join(path, '..'))
164 self.serverdir = os.path.join(self.autodir, 'server')
165 self.testdir = os.path.join(self.serverdir, 'tests')
166 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
167 self.tmpdir = os.path.join(self.serverdir, 'tmp')
168 self.conmuxdir = os.path.join(self.autodir, 'conmux')
169 self.clientdir = os.path.join(self.autodir, 'client')
170 self.toolsdir = os.path.join(self.autodir, 'client/tools')
171 if control:
172 self.control = open(control, 'r').read()
173 self.control = re.sub('\r', '', self.control)
174 else:
175 self.control = None
176 self.resultdir = resultdir
177 if not os.path.exists(resultdir):
178 os.mkdir(resultdir)
179 self.debugdir = os.path.join(resultdir, 'debug')
180 if not os.path.exists(self.debugdir):
181 os.mkdir(self.debugdir)
182 self.status = os.path.join(resultdir, 'status')
183 self.label = label
184 self.user = user
185 self.args = args
186 self.machines = machines
187 self.client = client
188 self.record_prefix = ''
189 self.warning_loggers = set()
190 self.ssh_user = ssh_user
191 self.ssh_port = ssh_port
192 self.ssh_pass = ssh_pass
193
194 self.stdout = fd_stack.fd_stack(1, sys.stdout)
195 self.stderr = fd_stack.fd_stack(2, sys.stderr)
196
197 if os.path.exists(self.status):
198 os.unlink(self.status)
199 job_data = {'label' : label, 'user' : user,
200 'hostname' : ','.join(machines),
201 'status_version' : str(self.STATUS_VERSION)}
202 job_data.update(get_site_job_data(self))
203 utils.write_keyval(self.resultdir, job_data)
204
205 self.parse_job = parse_job
206 if self.parse_job and len(machines) == 1:
207 self.using_parser = True
208 self.init_parser(resultdir)
209 else:
210 self.using_parser = False
211 self.pkgmgr = packages.PackageManager(
212 self.autodir, run_function_dargs={'timeout':600})
213 self.pkgdir = os.path.join(self.autodir, 'packages')
214
215
216 def init_parser(self, resultdir):
217 """Start the continuous parsing of resultdir. This sets up
218 the database connection and inserts the basic job object into
219 the database if necessary."""
220 # redirect parser debugging to .parse.log
221 parse_log = os.path.join(resultdir, '.parse.log')
222 parse_log = open(parse_log, 'w', 0)
223 tko_utils.redirect_parser_debugging(parse_log)
224 # create a job model object and set up the db
225 self.results_db = tko_db.db(autocommit=True)
226 self.parser = status_lib.parser(self.STATUS_VERSION)
227 self.job_model = self.parser.make_job(resultdir)
228 self.parser.start(self.job_model)
229 # check if a job already exists in the db and insert it if
230 # it does not
231 job_idx = self.results_db.find_job(self.parse_job)
232 if job_idx is None:
233 self.results_db.insert_job(self.parse_job,
234 self.job_model)
235 else:
236 machine_idx = self.results_db.lookup_machine(
237 self.job_model.machine)
238 self.job_model.index = job_idx
239 self.job_model.machine_idx = machine_idx
240
241
242 def cleanup_parser(self):
243 """This should be called after the server job is finished
244 to carry out any remaining cleanup (e.g. flushing any
245 remaining test results to the results db)"""
246 if not self.using_parser:
247 return
248 final_tests = self.parser.end()
249 for test in final_tests:
250 self.__insert_test(test)
251 self.using_parser = False
252
253
254 def verify(self):
255 if not self.machines:
256 raise error.AutoservError(
257 'No machines specified to verify')
258 try:
259 namespace = {'machines' : self.machines, 'job' : self, \
260 'ssh_user' : self.ssh_user, \
261 'ssh_port' : self.ssh_port, \
262 'ssh_pass' : self.ssh_pass}
263 self._execute_code(preamble + verify, namespace, namespace)
264 except Exception, e:
265 msg = ('Verify failed\n' + str(e) + '\n'
266 + traceback.format_exc())
267 self.record('ABORT', None, None, msg)
268 raise
269
270
271 def repair(self, host_protection):
272 if not self.machines:
273 raise error.AutoservError('No machines specified to repair')
274 namespace = {'machines': self.machines, 'job': self,
275 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
276 'ssh_pass': self.ssh_pass,
277 'protection_level': host_protection}
278 # no matter what happens during repair, go on to try to reverify
279 try:
280 self._execute_code(preamble + repair, namespace, namespace)
281 except Exception, exc:
282 print 'Exception occured during repair'
283 traceback.print_exc()
284 self.verify()
285
286
287 def precheck(self):
288 """
289 perform any additional checks in derived classes.
290 """
291 pass
292
293
294 def enable_external_logging(self):
295 """Start or restart external logging mechanism.
296 """
297 pass
298
299
300 def disable_external_logging(self):
301 """ Pause or stop external logging mechanism.
302 """
303 pass
304
305
306 def use_external_logging(self):
307 """Return True if external logging should be used.
308 """
309 return False
310
311
312 def parallel_simple(self, function, machines, log=True, timeout=None):
313 """Run 'function' using parallel_simple, with an extra
314 wrapper to handle the necessary setup for continuous parsing,
315 if possible. If continuous parsing is already properly
316 initialized then this should just work."""
317 is_forking = not (len(machines) == 1 and
318 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000319 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000320 def wrapper(machine):
321 self.parse_job += "/" + machine
322 self.using_parser = True
323 self.machines = [machine]
324 self.resultdir = os.path.join(self.resultdir,
325 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000326 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000327 self.init_parser(self.resultdir)
328 result = function(machine)
329 self.cleanup_parser()
330 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000331 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000332 def wrapper(machine):
333 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000334 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000335 result = function(machine)
336 return result
337 else:
338 wrapper = function
339 subcommand.parallel_simple(wrapper, machines, log, timeout)
340
341
342 def run(self, reboot = False, install_before = False,
343 install_after = False, collect_crashdumps = True,
344 namespace = {}):
345 # use a copy so changes don't affect the original dictionary
346 namespace = namespace.copy()
347 machines = self.machines
348
349 self.aborted = False
350 namespace['machines'] = machines
351 namespace['args'] = self.args
352 namespace['job'] = self
353 namespace['ssh_user'] = self.ssh_user
354 namespace['ssh_port'] = self.ssh_port
355 namespace['ssh_pass'] = self.ssh_pass
356 test_start_time = int(time.time())
357
358 os.chdir(self.resultdir)
359
360 self.enable_external_logging()
361 status_log = os.path.join(self.resultdir, 'status.log')
362 try:
363 if install_before and machines:
364 self._execute_code(preamble + install, namespace, namespace)
365 if self.client:
366 namespace['control'] = self.control
367 open('control', 'w').write(self.control)
368 open('control.srv', 'w').write(client_wrapper)
369 server_control = client_wrapper
370 else:
371 open('control.srv', 'w').write(self.control)
372 server_control = self.control
373 self._execute_code(preamble + server_control, namespace,
374 namespace)
375
376 finally:
377 if machines and collect_crashdumps:
378 namespace['test_start_time'] = test_start_time
379 self._execute_code(preamble + crashdumps, namespace,
380 namespace)
381 self.disable_external_logging()
382 if reboot and machines:
383 self._execute_code(preamble + reboot_segment,namespace,
384 namespace)
385 if install_after and machines:
386 self._execute_code(preamble + install, namespace, namespace)
387
388
389 def run_test(self, url, *args, **dargs):
390 """Summon a test object and run it.
391
392 tag
393 tag to add to testname
394 url
395 url of the test to run
396 """
397
398 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000399
400 tag = dargs.pop('tag', None)
401 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000402 testname += '.' + tag
403 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000404
405 outputdir = os.path.join(self.resultdir, subdir)
406 if os.path.exists(outputdir):
407 msg = ("%s already exists, test <%s> may have"
408 " already run with tag <%s>"
409 % (outputdir, testname, tag) )
410 raise error.TestError(msg)
411 os.mkdir(outputdir)
412
413 def group_func():
414 try:
415 test.runtest(self, url, tag, args, dargs)
416 except error.TestBaseException, e:
417 self.record(e.exit_status, subdir, testname, str(e))
418 raise
419 except Exception, e:
420 info = str(e) + "\n" + traceback.format_exc()
421 self.record('FAIL', subdir, testname, info)
422 raise
423 else:
424 self.record('GOOD', subdir, testname,
425 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000426
427 result, exc_info = self._run_group(testname, subdir, group_func)
428 if exc_info and isinstance(exc_info[1], error.TestBaseException):
429 return False
430 elif exc_info:
431 raise exc_info[0], exc_info[1], exc_info[2]
432 else:
433 return True
jadmanski10646442008-08-13 14:05:21 +0000434
435
436 def _run_group(self, name, subdir, function, *args, **dargs):
437 """\
438 Underlying method for running something inside of a group.
439 """
jadmanskide292df2008-08-26 20:51:14 +0000440 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000441 old_record_prefix = self.record_prefix
442 try:
443 self.record('START', subdir, name)
444 self.record_prefix += '\t'
445 try:
446 result = function(*args, **dargs)
447 finally:
448 self.record_prefix = old_record_prefix
449 except error.TestBaseException, e:
450 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000451 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000452 except Exception, e:
453 err_msg = str(e) + '\n'
454 err_msg += traceback.format_exc()
455 self.record('END ABORT', subdir, name, err_msg)
456 raise error.JobError(name + ' failed\n' + traceback.format_exc())
457 else:
458 self.record('END GOOD', subdir, name)
459
jadmanskide292df2008-08-26 20:51:14 +0000460 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000461
462
463 def run_group(self, function, *args, **dargs):
464 """\
465 function:
466 subroutine to run
467 *args:
468 arguments for the function
469 """
470
471 name = function.__name__
472
473 # Allow the tag for the group to be specified.
474 tag = dargs.pop('tag', None)
475 if tag:
476 name = tag
477
jadmanskide292df2008-08-26 20:51:14 +0000478 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000479
480
481 def run_reboot(self, reboot_func, get_kernel_func):
482 """\
483 A specialization of run_group meant specifically for handling
484 a reboot. Includes support for capturing the kernel version
485 after the reboot.
486
487 reboot_func: a function that carries out the reboot
488
489 get_kernel_func: a function that returns a string
490 representing the kernel version.
491 """
492
493 old_record_prefix = self.record_prefix
494 try:
495 self.record('START', None, 'reboot')
496 self.record_prefix += '\t'
497 reboot_func()
498 except Exception, e:
499 self.record_prefix = old_record_prefix
500 err_msg = str(e) + '\n' + traceback.format_exc()
501 self.record('END FAIL', None, 'reboot', err_msg)
502 else:
503 kernel = get_kernel_func()
504 self.record_prefix = old_record_prefix
505 self.record('END GOOD', None, 'reboot',
506 optional_fields={"kernel": kernel})
507
508
509 def record(self, status_code, subdir, operation, status='',
510 optional_fields=None):
511 """
512 Record job-level status
513
514 The intent is to make this file both machine parseable and
515 human readable. That involves a little more complexity, but
516 really isn't all that bad ;-)
517
518 Format is <status code>\t<subdir>\t<operation>\t<status>
519
520 status code: see common_lib.logging.is_valid_status()
521 for valid status definition
522
523 subdir: MUST be a relevant subdirectory in the results,
524 or None, which will be represented as '----'
525
526 operation: description of what you ran (e.g. "dbench", or
527 "mkfs -t foobar /dev/sda9")
528
529 status: error message or "completed sucessfully"
530
531 ------------------------------------------------------------
532
533 Initial tabs indicate indent levels for grouping, and is
534 governed by self.record_prefix
535
536 multiline messages have secondary lines prefaced by a double
537 space (' ')
538
539 Executing this method will trigger the logging of all new
540 warnings to date from the various console loggers.
541 """
542 # poll all our warning loggers for new warnings
543 warnings = self._read_warnings()
544 for timestamp, msg in warnings:
545 self._record("WARN", None, None, msg, timestamp)
546
547 # write out the actual status log line
548 self._record(status_code, subdir, operation, status,
549 optional_fields=optional_fields)
550
551
552 def _read_warnings(self):
553 warnings = []
554 while True:
555 # pull in a line of output from every logger that has
556 # output ready to be read
557 loggers, _, _ = select.select(self.warning_loggers,
558 [], [], 0)
559 closed_loggers = set()
560 for logger in loggers:
561 line = logger.readline()
562 # record any broken pipes (aka line == empty)
563 if len(line) == 0:
564 closed_loggers.add(logger)
565 continue
566 timestamp, msg = line.split('\t', 1)
567 warnings.append((int(timestamp), msg.strip()))
568
569 # stop listening to loggers that are closed
570 self.warning_loggers -= closed_loggers
571
572 # stop if none of the loggers have any output left
573 if not loggers:
574 break
575
576 # sort into timestamp order
577 warnings.sort()
578 return warnings
579
580
581 def _render_record(self, status_code, subdir, operation, status='',
582 epoch_time=None, record_prefix=None,
583 optional_fields=None):
584 """
585 Internal Function to generate a record to be written into a
586 status log. For use by server_job.* classes only.
587 """
588 if subdir:
589 if re.match(r'[\n\t]', subdir):
590 raise ValueError(
591 'Invalid character in subdir string')
592 substr = subdir
593 else:
594 substr = '----'
595
596 if not logging.is_valid_status(status_code):
597 raise ValueError('Invalid status code supplied: %s' %
598 status_code)
599 if not operation:
600 operation = '----'
601 if re.match(r'[\n\t]', operation):
602 raise ValueError(
603 'Invalid character in operation string')
604 operation = operation.rstrip()
605 status = status.rstrip()
606 status = re.sub(r"\t", " ", status)
607 # Ensure any continuation lines are marked so we can
608 # detect them in the status file to ensure it is parsable.
609 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
610
611 if not optional_fields:
612 optional_fields = {}
613
614 # Generate timestamps for inclusion in the logs
615 if epoch_time is None:
616 epoch_time = int(time.time())
617 local_time = time.localtime(epoch_time)
618 optional_fields["timestamp"] = str(epoch_time)
619 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
620 local_time)
621
622 fields = [status_code, substr, operation]
623 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
624 fields.append(status)
625
626 if record_prefix is None:
627 record_prefix = self.record_prefix
628
629 msg = '\t'.join(str(x) for x in fields)
630
631 return record_prefix + msg + '\n'
632
633
634 def _record_prerendered(self, msg):
635 """
636 Record a pre-rendered msg into the status logs. The only
637 change this makes to the message is to add on the local
638 indentation. Should not be called outside of server_job.*
639 classes. Unlike _record, this does not write the message
640 to standard output.
641 """
642 lines = []
643 status_file = os.path.join(self.resultdir, 'status.log')
644 status_log = open(status_file, 'a')
645 for line in msg.splitlines():
646 line = self.record_prefix + line + '\n'
647 lines.append(line)
648 status_log.write(line)
649 status_log.close()
650 self.__parse_status(lines)
651
652
653 def _execute_code(self, code, global_scope, local_scope):
654 exec(code, global_scope, local_scope)
655
656
657 def _record(self, status_code, subdir, operation, status='',
658 epoch_time=None, optional_fields=None):
659 """
660 Actual function for recording a single line into the status
661 logs. Should never be called directly, only by job.record as
662 this would bypass the console monitor logging.
663 """
664
665 msg = self._render_record(status_code, subdir, operation,
666 status, epoch_time,
667 optional_fields=optional_fields)
668
669
670 status_file = os.path.join(self.resultdir, 'status.log')
671 sys.stdout.write(msg)
672 open(status_file, "a").write(msg)
673 if subdir:
674 test_dir = os.path.join(self.resultdir, subdir)
675 status_file = os.path.join(test_dir, 'status')
676 open(status_file, "a").write(msg)
677 self.__parse_status(msg.splitlines())
678
679
680 def __parse_status(self, new_lines):
681 if not self.using_parser:
682 return
683 new_tests = self.parser.process_lines(new_lines)
684 for test in new_tests:
685 self.__insert_test(test)
686
687
688 def __insert_test(self, test):
689 """ An internal method to insert a new test result into the
690 database. This method will not raise an exception, even if an
691 error occurs during the insert, to avoid failing a test
692 simply because of unexpected database issues."""
693 try:
694 self.results_db.insert_test(self.job_model, test)
695 except Exception:
696 msg = ("WARNING: An unexpected error occured while "
697 "inserting test results into the database. "
698 "Ignoring error.\n" + traceback.format_exc())
699 print >> sys.stderr, msg
700
mblighcaa62c22008-04-07 21:51:17 +0000701
jadmanskib6eb2f12008-09-12 16:39:36 +0000702
703 def collect_client_job_results(self):
704 """ A method that collects all the current results of a running
705 client job into the results dir. By default does nothing as no
706 client job is running, but when running a client job you can override
707 this with something that will actually do something. """
708 pass
709
710
mbligh57e78662008-06-17 19:53:49 +0000711# a file-like object for catching stderr from an autotest client and
712# extracting status logs from it
713class client_logger(object):
714 """Partial file object to write to both stdout and
715 the status log file. We only implement those methods
716 utils.run() actually calls.
717 """
718 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
jadmanskib6eb2f12008-09-12 16:39:36 +0000719 test_complete = re.compile(r"^AUTOTEST_TEST_COMPLETE$")
mbligh57e78662008-06-17 19:53:49 +0000720 extract_indent = re.compile(r"^(\t*).*$")
721
jadmanskib6eb2f12008-09-12 16:39:36 +0000722 def __init__(self, host):
723 self.host = host
724 self.job = host.job
mbligh57e78662008-06-17 19:53:49 +0000725 self.leftover = ""
726 self.last_line = ""
727 self.logs = {}
728
729
730 def _process_log_dict(self, log_dict):
731 log_list = log_dict.pop("logs", [])
732 for key in sorted(log_dict.iterkeys()):
733 log_list += self._process_log_dict(log_dict.pop(key))
734 return log_list
735
736
737 def _process_logs(self):
738 """Go through the accumulated logs in self.log and print them
739 out to stdout and the status log. Note that this processes
740 logs in an ordering where:
741
742 1) logs to different tags are never interleaved
743 2) logs to x.y come before logs to x.y.z for all z
744 3) logs to x.y come before x.z whenever y < z
745
746 Note that this will in general not be the same as the
747 chronological ordering of the logs. However, if a chronological
748 ordering is desired that one can be reconstructed from the
749 status log by looking at timestamp lines."""
750 log_list = self._process_log_dict(self.logs)
751 for line in log_list:
752 self.job._record_prerendered(line + '\n')
753 if log_list:
754 self.last_line = log_list[-1]
755
756
757 def _process_quoted_line(self, tag, line):
758 """Process a line quoted with an AUTOTEST_STATUS flag. If the
759 tag is blank then we want to push out all the data we've been
760 building up in self.logs, and then the newest line. If the
761 tag is not blank, then push the line into the logs for handling
762 later."""
763 print line
764 if tag == "":
765 self._process_logs()
766 self.job._record_prerendered(line + '\n')
767 self.last_line = line
768 else:
769 tag_parts = [int(x) for x in tag.split(".")]
770 log_dict = self.logs
771 for part in tag_parts:
772 log_dict = log_dict.setdefault(part, {})
773 log_list = log_dict.setdefault("logs", [])
774 log_list.append(line)
775
776
777 def _process_line(self, line):
778 """Write out a line of data to the appropriate stream. Status
779 lines sent by autotest will be prepended with
780 "AUTOTEST_STATUS", and all other lines are ssh error
781 messages."""
782 match = self.parser.search(line)
783 if match:
784 tag, line = match.groups()
785 self._process_quoted_line(tag, line)
jadmanskib6eb2f12008-09-12 16:39:36 +0000786 elif self.test_complete.search(line):
787 self.job.collect_client_job_results()
788 fifo = os.path.join(self.host.get_autodir(), "autoserv.fifo")
789 self.host.run("echo A > %s" % fifo)
mbligh57e78662008-06-17 19:53:49 +0000790 else:
791 print line
792
jadmanski4aeefe12008-06-20 20:04:25 +0000793
794 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000795 # use the indentation of whatever the last log line was
796 indent = self.extract_indent.match(last_line).group(1)
797 # if the last line starts a new group, add an extra indent
798 if last_line.lstrip('\t').startswith("START\t"):
799 indent += '\t'
800 return [self.job._render_record("WARN", None, None, msg,
801 timestamp, indent).rstrip('\n')
802 for timestamp, msg in warnings]
803
804
805 def _process_warnings(self, last_line, log_dict, warnings):
806 if log_dict.keys() in ([], ["logs"]):
807 # there are no sub-jobs, just append the warnings here
808 warnings = self._format_warnings(last_line, warnings)
809 log_list = log_dict.setdefault("logs", [])
810 log_list += warnings
811 for warning in warnings:
812 sys.stdout.write(warning + '\n')
813 else:
814 # there are sub-jobs, so put the warnings in there
815 log_list = log_dict.get("logs", [])
816 if log_list:
817 last_line = log_list[-1]
818 for key in sorted(log_dict.iterkeys()):
819 if key != "logs":
820 self._process_warnings(last_line,
821 log_dict[key],
822 warnings)
823
824
825 def write(self, data):
826 # first check for any new console warnings
827 warnings = self.job._read_warnings()
828 self._process_warnings(self.last_line, self.logs, warnings)
829 # now process the newest data written out
830 data = self.leftover + data
831 lines = data.split("\n")
832 # process every line but the last one
833 for line in lines[:-1]:
834 self._process_line(line)
835 # save the last line for later processing
836 # since we may not have the whole line yet
837 self.leftover = lines[-1]
838
839
840 def flush(self):
841 sys.stdout.flush()
842
843
844 def close(self):
845 if self.leftover:
846 self._process_line(self.leftover)
847 self._process_logs()
848 self.flush()
849
850
mblighcaa62c22008-04-07 21:51:17 +0000851# site_server_job.py may be non-existant or empty, make sure that an
852# appropriate site_server_job class is created nevertheless
853try:
jadmanski0afbb632008-06-06 21:10:57 +0000854 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000855except ImportError:
jadmanski10646442008-08-13 14:05:21 +0000856 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +0000857 pass
858
jadmanski10646442008-08-13 14:05:21 +0000859class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +0000860 pass