blob: df773b8aa3be8af5cd03c99a1f3c81480073be37 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
jadmanski025099d2008-09-23 14:13:48 +00009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
jadmanski10646442008-08-13 14:05:21 +000010from autotest_lib.client.bin import fd_stack
mbligh09108442008-10-15 16:27:38 +000011from autotest_lib.client.common_lib import error, log, utils, packages
jadmanski10646442008-08-13 14:05:21 +000012from autotest_lib.server import test, subcommand
13from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000014
15
16# load up a control segment
17# these are all stored in <server_dir>/control_segments
18def load_control_segment(name):
19 server_dir = os.path.dirname(os.path.abspath(__file__))
20 script_file = os.path.join(server_dir, "control_segments", name)
21 if os.path.exists(script_file):
22 return file(script_file).read()
23 else:
24 return ""
25
26
27preamble = """\
28import os, sys
29
30from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
31from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
32from autotest_lib.server import git_kernel
33from autotest_lib.server.subcommand import *
34from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
35from autotest_lib.server.utils import parse_machine
36from autotest_lib.client.common_lib.error import *
37from autotest_lib.client.common_lib import barrier
38
39autotest.Autotest.job = job
jadmanski1c5e3a12008-08-15 23:08:20 +000040hosts.Host.job = job
jadmanski10646442008-08-13 14:05:21 +000041barrier = barrier.barrier
42if len(machines) > 1:
43 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
44"""
45
46client_wrapper = """
47at = autotest.Autotest()
48
49def run_client(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000050 hostname, user, passwd, port = parse_machine(
51 machine, ssh_user, ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000052
jadmanski1c5e3a12008-08-15 23:08:20 +000053 host = hosts.create_host(hostname, user=user, port=port, password=passwd)
jadmanski807490c2008-09-15 19:15:02 +000054 host.log_kernel()
jadmanski1c5e3a12008-08-15 23:08:20 +000055 at.run(control, host=host)
jadmanski10646442008-08-13 14:05:21 +000056
57job.parallel_simple(run_client, machines)
58"""
59
60crashdumps = """
61def crashdumps(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000062 hostname, user, passwd, port = parse_machine(machine, ssh_user,
63 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000064
jadmanski1c5e3a12008-08-15 23:08:20 +000065 host = hosts.create_host(hostname, user=user, port=port,
66 initialize=False, password=passwd)
67 host.get_crashdumps(test_start_time)
jadmanski10646442008-08-13 14:05:21 +000068
69job.parallel_simple(crashdumps, machines, log=False)
70"""
71
jadmanskicdd0c402008-09-19 21:21:31 +000072
73crashinfo = """
74def crashinfo(machine):
75 hostname, user, passwd, port = parse_machine(machine, ssh_user,
76 ssh_port, ssh_pass)
77
78 host = hosts.create_host(hostname, user=user, port=port,
79 initialize=False, password=passwd)
80 host.get_crashinfo(test_start_time)
81
82job.parallel_simple(crashinfo, machines, log=False)
83"""
84
85
jadmanski10646442008-08-13 14:05:21 +000086reboot_segment="""\
87def reboot(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000088 hostname, user, passwd, port = parse_machine(machine, ssh_user,
89 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000090
jadmanski8e72aaf2008-08-20 19:22:29 +000091 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000092 initialize=False, password=passwd)
93 host.reboot()
jadmanski10646442008-08-13 14:05:21 +000094
95job.parallel_simple(reboot, machines, log=False)
96"""
97
98install="""\
99def install(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +0000100 hostname, user, passwd, port = parse_machine(machine, ssh_user,
101 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +0000102
jadmanski8e72aaf2008-08-20 19:22:29 +0000103 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +0000104 initialize=False, password=passwd)
105 host.machine_install()
jadmanski10646442008-08-13 14:05:21 +0000106
107job.parallel_simple(install, machines, log=False)
108"""
109
110# load up the verifier control segment, with an optional site-specific hook
111verify = load_control_segment("site_verify")
112verify += load_control_segment("verify")
113
114# load up the repair control segment, with an optional site-specific hook
115repair = load_control_segment("site_repair")
116repair += load_control_segment("repair")
117
118
119# load up site-specific code for generating site-specific job data
120try:
121 import site_job
122 get_site_job_data = site_job.get_site_job_data
123 del site_job
124except ImportError:
125 # by default provide a stub that generates no site data
126 def get_site_job_data(job):
127 return {}
128
129
130class base_server_job(object):
131 """The actual job against which we do everything.
132
133 Properties:
134 autodir
135 The top level autotest directory (/usr/local/autotest).
136 serverdir
137 <autodir>/server/
138 clientdir
139 <autodir>/client/
140 conmuxdir
141 <autodir>/conmux/
142 testdir
143 <autodir>/server/tests/
144 site_testdir
145 <autodir>/server/site_tests/
146 control
147 the control file for this job
148 """
149
150 STATUS_VERSION = 1
151
152
153 def __init__(self, control, args, resultdir, label, user, machines,
154 client=False, parse_job='',
155 ssh_user='root', ssh_port=22, ssh_pass=''):
156 """
157 control
158 The control file (pathname of)
159 args
160 args to pass to the control file
161 resultdir
162 where to throw the results
163 label
164 label for the job
165 user
166 Username for the job (email address)
167 client
168 True if a client-side control file
169 """
170 path = os.path.dirname(__file__)
171 self.autodir = os.path.abspath(os.path.join(path, '..'))
172 self.serverdir = os.path.join(self.autodir, 'server')
173 self.testdir = os.path.join(self.serverdir, 'tests')
174 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
175 self.tmpdir = os.path.join(self.serverdir, 'tmp')
176 self.conmuxdir = os.path.join(self.autodir, 'conmux')
177 self.clientdir = os.path.join(self.autodir, 'client')
178 self.toolsdir = os.path.join(self.autodir, 'client/tools')
179 if control:
180 self.control = open(control, 'r').read()
181 self.control = re.sub('\r', '', self.control)
182 else:
183 self.control = None
184 self.resultdir = resultdir
185 if not os.path.exists(resultdir):
186 os.mkdir(resultdir)
187 self.debugdir = os.path.join(resultdir, 'debug')
188 if not os.path.exists(self.debugdir):
189 os.mkdir(self.debugdir)
190 self.status = os.path.join(resultdir, 'status')
191 self.label = label
192 self.user = user
193 self.args = args
194 self.machines = machines
195 self.client = client
196 self.record_prefix = ''
197 self.warning_loggers = set()
198 self.ssh_user = ssh_user
199 self.ssh_port = ssh_port
200 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000201 self.run_test_cleanup = True
mbligh09108442008-10-15 16:27:38 +0000202 self.last_boot_tag = None
jadmanski10646442008-08-13 14:05:21 +0000203
204 self.stdout = fd_stack.fd_stack(1, sys.stdout)
205 self.stderr = fd_stack.fd_stack(2, sys.stderr)
206
jadmanski025099d2008-09-23 14:13:48 +0000207 if not os.access(self.tmpdir, os.W_OK):
208 try:
209 os.makedirs(self.tmpdir, 0700)
210 except os.error, e:
211 # Thrown if the directory already exists, which it may.
212 pass
213
214 if (not os.access(self.tmpdir, os.W_OK) or
215 not os.path.isdir(self.tmpdir)):
216 self.tmpdir = os.path.join(tempfile.gettempdir(),
217 'autotest-' + getpass.getuser())
218 try:
219 os.makedirs(self.tmpdir, 0700)
220 except os.error, e:
221 # Thrown if the directory already exists, which it may.
222 # If the problem was something other than the
223 # directory already existing, this chmod should throw as well
224 # exception.
225 os.chmod(self.tmpdir, stat.S_IRWXU)
226
jadmanski10646442008-08-13 14:05:21 +0000227 if os.path.exists(self.status):
228 os.unlink(self.status)
229 job_data = {'label' : label, 'user' : user,
230 'hostname' : ','.join(machines),
231 'status_version' : str(self.STATUS_VERSION)}
232 job_data.update(get_site_job_data(self))
233 utils.write_keyval(self.resultdir, job_data)
234
235 self.parse_job = parse_job
236 if self.parse_job and len(machines) == 1:
237 self.using_parser = True
238 self.init_parser(resultdir)
239 else:
240 self.using_parser = False
241 self.pkgmgr = packages.PackageManager(
242 self.autodir, run_function_dargs={'timeout':600})
243 self.pkgdir = os.path.join(self.autodir, 'packages')
244
245
246 def init_parser(self, resultdir):
247 """Start the continuous parsing of resultdir. This sets up
248 the database connection and inserts the basic job object into
249 the database if necessary."""
250 # redirect parser debugging to .parse.log
251 parse_log = os.path.join(resultdir, '.parse.log')
252 parse_log = open(parse_log, 'w', 0)
253 tko_utils.redirect_parser_debugging(parse_log)
254 # create a job model object and set up the db
255 self.results_db = tko_db.db(autocommit=True)
256 self.parser = status_lib.parser(self.STATUS_VERSION)
257 self.job_model = self.parser.make_job(resultdir)
258 self.parser.start(self.job_model)
259 # check if a job already exists in the db and insert it if
260 # it does not
261 job_idx = self.results_db.find_job(self.parse_job)
262 if job_idx is None:
263 self.results_db.insert_job(self.parse_job,
264 self.job_model)
265 else:
266 machine_idx = self.results_db.lookup_machine(
267 self.job_model.machine)
268 self.job_model.index = job_idx
269 self.job_model.machine_idx = machine_idx
270
271
272 def cleanup_parser(self):
273 """This should be called after the server job is finished
274 to carry out any remaining cleanup (e.g. flushing any
275 remaining test results to the results db)"""
276 if not self.using_parser:
277 return
278 final_tests = self.parser.end()
279 for test in final_tests:
280 self.__insert_test(test)
281 self.using_parser = False
282
283
284 def verify(self):
285 if not self.machines:
286 raise error.AutoservError(
287 'No machines specified to verify')
288 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000289 namespace = {'machines' : self.machines, 'job' : self,
290 'ssh_user' : self.ssh_user,
291 'ssh_port' : self.ssh_port,
292 'ssh_pass' : self.ssh_pass}
293 self._execute_code(preamble + verify, namespace)
jadmanski10646442008-08-13 14:05:21 +0000294 except Exception, e:
295 msg = ('Verify failed\n' + str(e) + '\n'
296 + traceback.format_exc())
297 self.record('ABORT', None, None, msg)
298 raise
299
300
301 def repair(self, host_protection):
302 if not self.machines:
303 raise error.AutoservError('No machines specified to repair')
304 namespace = {'machines': self.machines, 'job': self,
305 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
306 'ssh_pass': self.ssh_pass,
307 'protection_level': host_protection}
308 # no matter what happens during repair, go on to try to reverify
309 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000310 self._execute_code(preamble + repair, namespace)
jadmanski10646442008-08-13 14:05:21 +0000311 except Exception, exc:
312 print 'Exception occured during repair'
313 traceback.print_exc()
314 self.verify()
315
316
317 def precheck(self):
318 """
319 perform any additional checks in derived classes.
320 """
321 pass
322
323
324 def enable_external_logging(self):
325 """Start or restart external logging mechanism.
326 """
327 pass
328
329
330 def disable_external_logging(self):
331 """ Pause or stop external logging mechanism.
332 """
333 pass
334
335
jadmanski23afbec2008-09-17 18:12:07 +0000336 def enable_test_cleanup(self):
337 """ By default tests run test.cleanup """
338 self.run_test_cleanup = True
339
340
341 def disable_test_cleanup(self):
342 """ By default tests do not run test.cleanup """
343 self.run_test_cleanup = False
344
345
jadmanski10646442008-08-13 14:05:21 +0000346 def use_external_logging(self):
347 """Return True if external logging should be used.
348 """
349 return False
350
351
352 def parallel_simple(self, function, machines, log=True, timeout=None):
353 """Run 'function' using parallel_simple, with an extra
354 wrapper to handle the necessary setup for continuous parsing,
355 if possible. If continuous parsing is already properly
356 initialized then this should just work."""
357 is_forking = not (len(machines) == 1 and
358 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000359 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000360 def wrapper(machine):
361 self.parse_job += "/" + machine
362 self.using_parser = True
363 self.machines = [machine]
364 self.resultdir = os.path.join(self.resultdir,
365 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000366 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000367 self.init_parser(self.resultdir)
368 result = function(machine)
369 self.cleanup_parser()
370 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000371 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000372 def wrapper(machine):
373 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000374 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000375 result = function(machine)
376 return result
377 else:
378 wrapper = function
379 subcommand.parallel_simple(wrapper, machines, log, timeout)
380
381
382 def run(self, reboot = False, install_before = False,
383 install_after = False, collect_crashdumps = True,
384 namespace = {}):
385 # use a copy so changes don't affect the original dictionary
386 namespace = namespace.copy()
387 machines = self.machines
388
389 self.aborted = False
390 namespace['machines'] = machines
391 namespace['args'] = self.args
392 namespace['job'] = self
393 namespace['ssh_user'] = self.ssh_user
394 namespace['ssh_port'] = self.ssh_port
395 namespace['ssh_pass'] = self.ssh_pass
396 test_start_time = int(time.time())
397
398 os.chdir(self.resultdir)
399
400 self.enable_external_logging()
401 status_log = os.path.join(self.resultdir, 'status.log')
jadmanskicdd0c402008-09-19 21:21:31 +0000402 collect_crashinfo = True
jadmanski10646442008-08-13 14:05:21 +0000403 try:
404 if install_before and machines:
jadmanskicdd0c402008-09-19 21:21:31 +0000405 self._execute_code(preamble + install, namespace)
jadmanski10646442008-08-13 14:05:21 +0000406 if self.client:
407 namespace['control'] = self.control
408 open('control', 'w').write(self.control)
409 open('control.srv', 'w').write(client_wrapper)
410 server_control = client_wrapper
411 else:
412 open('control.srv', 'w').write(self.control)
413 server_control = self.control
jadmanskicdd0c402008-09-19 21:21:31 +0000414 self._execute_code(preamble + server_control, namespace)
jadmanski10646442008-08-13 14:05:21 +0000415
jadmanskicdd0c402008-09-19 21:21:31 +0000416 # disable crashinfo collection if we get this far without error
417 collect_crashinfo = False
jadmanski10646442008-08-13 14:05:21 +0000418 finally:
jadmanskicdd0c402008-09-19 21:21:31 +0000419 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000420 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000421 if collect_crashinfo:
422 script = crashinfo # includes crashdumps
423 else:
424 script = crashdumps
425 self._execute_code(preamble + script, namespace)
jadmanski10646442008-08-13 14:05:21 +0000426 self.disable_external_logging()
427 if reboot and machines:
jadmanskicdd0c402008-09-19 21:21:31 +0000428 self._execute_code(preamble + reboot_segment, namespace)
jadmanski10646442008-08-13 14:05:21 +0000429 if install_after and machines:
jadmanskicdd0c402008-09-19 21:21:31 +0000430 self._execute_code(preamble + install, namespace)
jadmanski10646442008-08-13 14:05:21 +0000431
432
433 def run_test(self, url, *args, **dargs):
434 """Summon a test object and run it.
435
436 tag
437 tag to add to testname
438 url
439 url of the test to run
440 """
441
442 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000443
444 tag = dargs.pop('tag', None)
445 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000446 testname += '.' + tag
447 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000448
449 outputdir = os.path.join(self.resultdir, subdir)
450 if os.path.exists(outputdir):
451 msg = ("%s already exists, test <%s> may have"
452 " already run with tag <%s>"
453 % (outputdir, testname, tag) )
454 raise error.TestError(msg)
455 os.mkdir(outputdir)
456
457 def group_func():
458 try:
459 test.runtest(self, url, tag, args, dargs)
460 except error.TestBaseException, e:
461 self.record(e.exit_status, subdir, testname, str(e))
462 raise
463 except Exception, e:
464 info = str(e) + "\n" + traceback.format_exc()
465 self.record('FAIL', subdir, testname, info)
466 raise
467 else:
468 self.record('GOOD', subdir, testname,
469 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000470
471 result, exc_info = self._run_group(testname, subdir, group_func)
472 if exc_info and isinstance(exc_info[1], error.TestBaseException):
473 return False
474 elif exc_info:
475 raise exc_info[0], exc_info[1], exc_info[2]
476 else:
477 return True
jadmanski10646442008-08-13 14:05:21 +0000478
479
480 def _run_group(self, name, subdir, function, *args, **dargs):
481 """\
482 Underlying method for running something inside of a group.
483 """
jadmanskide292df2008-08-26 20:51:14 +0000484 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000485 old_record_prefix = self.record_prefix
486 try:
487 self.record('START', subdir, name)
488 self.record_prefix += '\t'
489 try:
490 result = function(*args, **dargs)
491 finally:
492 self.record_prefix = old_record_prefix
493 except error.TestBaseException, e:
494 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000495 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000496 except Exception, e:
497 err_msg = str(e) + '\n'
498 err_msg += traceback.format_exc()
499 self.record('END ABORT', subdir, name, err_msg)
500 raise error.JobError(name + ' failed\n' + traceback.format_exc())
501 else:
502 self.record('END GOOD', subdir, name)
503
jadmanskide292df2008-08-26 20:51:14 +0000504 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000505
506
507 def run_group(self, function, *args, **dargs):
508 """\
509 function:
510 subroutine to run
511 *args:
512 arguments for the function
513 """
514
515 name = function.__name__
516
517 # Allow the tag for the group to be specified.
518 tag = dargs.pop('tag', None)
519 if tag:
520 name = tag
521
jadmanskide292df2008-08-26 20:51:14 +0000522 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000523
524
525 def run_reboot(self, reboot_func, get_kernel_func):
526 """\
527 A specialization of run_group meant specifically for handling
528 a reboot. Includes support for capturing the kernel version
529 after the reboot.
530
531 reboot_func: a function that carries out the reboot
532
533 get_kernel_func: a function that returns a string
534 representing the kernel version.
535 """
536
537 old_record_prefix = self.record_prefix
538 try:
539 self.record('START', None, 'reboot')
540 self.record_prefix += '\t'
541 reboot_func()
542 except Exception, e:
543 self.record_prefix = old_record_prefix
544 err_msg = str(e) + '\n' + traceback.format_exc()
545 self.record('END FAIL', None, 'reboot', err_msg)
546 else:
547 kernel = get_kernel_func()
548 self.record_prefix = old_record_prefix
549 self.record('END GOOD', None, 'reboot',
550 optional_fields={"kernel": kernel})
551
552
553 def record(self, status_code, subdir, operation, status='',
554 optional_fields=None):
555 """
556 Record job-level status
557
558 The intent is to make this file both machine parseable and
559 human readable. That involves a little more complexity, but
560 really isn't all that bad ;-)
561
562 Format is <status code>\t<subdir>\t<operation>\t<status>
563
mbligh1b3b3762008-09-25 02:46:34 +0000564 status code: see common_lib.log.is_valid_status()
jadmanski10646442008-08-13 14:05:21 +0000565 for valid status definition
566
567 subdir: MUST be a relevant subdirectory in the results,
568 or None, which will be represented as '----'
569
570 operation: description of what you ran (e.g. "dbench", or
571 "mkfs -t foobar /dev/sda9")
572
573 status: error message or "completed sucessfully"
574
575 ------------------------------------------------------------
576
577 Initial tabs indicate indent levels for grouping, and is
578 governed by self.record_prefix
579
580 multiline messages have secondary lines prefaced by a double
581 space (' ')
582
583 Executing this method will trigger the logging of all new
584 warnings to date from the various console loggers.
585 """
586 # poll all our warning loggers for new warnings
587 warnings = self._read_warnings()
588 for timestamp, msg in warnings:
589 self._record("WARN", None, None, msg, timestamp)
590
591 # write out the actual status log line
592 self._record(status_code, subdir, operation, status,
593 optional_fields=optional_fields)
594
595
596 def _read_warnings(self):
597 warnings = []
598 while True:
599 # pull in a line of output from every logger that has
600 # output ready to be read
601 loggers, _, _ = select.select(self.warning_loggers,
602 [], [], 0)
603 closed_loggers = set()
604 for logger in loggers:
605 line = logger.readline()
606 # record any broken pipes (aka line == empty)
607 if len(line) == 0:
608 closed_loggers.add(logger)
609 continue
610 timestamp, msg = line.split('\t', 1)
611 warnings.append((int(timestamp), msg.strip()))
612
613 # stop listening to loggers that are closed
614 self.warning_loggers -= closed_loggers
615
616 # stop if none of the loggers have any output left
617 if not loggers:
618 break
619
620 # sort into timestamp order
621 warnings.sort()
622 return warnings
623
624
625 def _render_record(self, status_code, subdir, operation, status='',
626 epoch_time=None, record_prefix=None,
627 optional_fields=None):
628 """
629 Internal Function to generate a record to be written into a
630 status log. For use by server_job.* classes only.
631 """
632 if subdir:
633 if re.match(r'[\n\t]', subdir):
634 raise ValueError(
635 'Invalid character in subdir string')
636 substr = subdir
637 else:
638 substr = '----'
639
mbligh1b3b3762008-09-25 02:46:34 +0000640 if not log.is_valid_status(status_code):
jadmanski10646442008-08-13 14:05:21 +0000641 raise ValueError('Invalid status code supplied: %s' %
642 status_code)
643 if not operation:
644 operation = '----'
645 if re.match(r'[\n\t]', operation):
646 raise ValueError(
647 'Invalid character in operation string')
648 operation = operation.rstrip()
649 status = status.rstrip()
650 status = re.sub(r"\t", " ", status)
651 # Ensure any continuation lines are marked so we can
652 # detect them in the status file to ensure it is parsable.
653 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
654
655 if not optional_fields:
656 optional_fields = {}
657
658 # Generate timestamps for inclusion in the logs
659 if epoch_time is None:
660 epoch_time = int(time.time())
661 local_time = time.localtime(epoch_time)
662 optional_fields["timestamp"] = str(epoch_time)
663 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
664 local_time)
665
666 fields = [status_code, substr, operation]
667 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
668 fields.append(status)
669
670 if record_prefix is None:
671 record_prefix = self.record_prefix
672
673 msg = '\t'.join(str(x) for x in fields)
674
675 return record_prefix + msg + '\n'
676
677
678 def _record_prerendered(self, msg):
679 """
680 Record a pre-rendered msg into the status logs. The only
681 change this makes to the message is to add on the local
682 indentation. Should not be called outside of server_job.*
683 classes. Unlike _record, this does not write the message
684 to standard output.
685 """
686 lines = []
687 status_file = os.path.join(self.resultdir, 'status.log')
688 status_log = open(status_file, 'a')
689 for line in msg.splitlines():
690 line = self.record_prefix + line + '\n'
691 lines.append(line)
692 status_log.write(line)
693 status_log.close()
694 self.__parse_status(lines)
695
696
jadmanskicdd0c402008-09-19 21:21:31 +0000697 def _execute_code(self, code, scope):
698 exec(code, scope, scope)
jadmanski10646442008-08-13 14:05:21 +0000699
700
701 def _record(self, status_code, subdir, operation, status='',
702 epoch_time=None, optional_fields=None):
703 """
704 Actual function for recording a single line into the status
705 logs. Should never be called directly, only by job.record as
706 this would bypass the console monitor logging.
707 """
708
709 msg = self._render_record(status_code, subdir, operation,
710 status, epoch_time,
711 optional_fields=optional_fields)
712
713
714 status_file = os.path.join(self.resultdir, 'status.log')
715 sys.stdout.write(msg)
716 open(status_file, "a").write(msg)
717 if subdir:
718 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000719 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000720 open(status_file, "a").write(msg)
721 self.__parse_status(msg.splitlines())
722
723
724 def __parse_status(self, new_lines):
725 if not self.using_parser:
726 return
727 new_tests = self.parser.process_lines(new_lines)
728 for test in new_tests:
729 self.__insert_test(test)
730
731
732 def __insert_test(self, test):
733 """ An internal method to insert a new test result into the
734 database. This method will not raise an exception, even if an
735 error occurs during the insert, to avoid failing a test
736 simply because of unexpected database issues."""
737 try:
738 self.results_db.insert_test(self.job_model, test)
739 except Exception:
740 msg = ("WARNING: An unexpected error occured while "
741 "inserting test results into the database. "
742 "Ignoring error.\n" + traceback.format_exc())
743 print >> sys.stderr, msg
744
mblighcaa62c22008-04-07 21:51:17 +0000745
jadmanskib6eb2f12008-09-12 16:39:36 +0000746
jadmanskia1f3c202008-09-15 19:17:16 +0000747class log_collector(object):
748 def __init__(self, host, client_tag, results_dir):
749 self.host = host
750 if not client_tag:
751 client_tag = "default"
752 self.client_results_dir = os.path.join(host.get_autodir(), "results",
753 client_tag)
754 self.server_results_dir = results_dir
755
756
jadmanskib6eb2f12008-09-12 16:39:36 +0000757 def collect_client_job_results(self):
758 """ A method that collects all the current results of a running
759 client job into the results dir. By default does nothing as no
760 client job is running, but when running a client job you can override
761 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000762
763 # make an effort to wait for the machine to come up
764 try:
765 self.host.wait_up(timeout=30)
766 except error.AutoservError:
767 # don't worry about any errors, we'll try and
768 # get the results anyway
769 pass
770
771
772 # Copy all dirs in default to results_dir
jadmanskibfb32f82008-10-03 23:13:36 +0000773 try:
774 keyval_path = self._prepare_for_copying_logs()
775 self.host.get_file(self.client_results_dir + '/',
776 self.server_results_dir)
777 self._process_copied_logs(keyval_path)
778 self._postprocess_copied_logs()
779 except Exception:
780 # well, don't stop running just because we couldn't get logs
781 print "Unexpected error copying test result logs, continuing ..."
782 traceback.print_exc(file=sys.stdout)
jadmanskia1f3c202008-09-15 19:17:16 +0000783
784
785 def _prepare_for_copying_logs(self):
786 server_keyval = os.path.join(self.server_results_dir, 'keyval')
787 if not os.path.exists(server_keyval):
788 # Client-side keyval file can be copied directly
789 return
790
791 # Copy client-side keyval to temporary location
792 suffix = '.keyval_%s' % self.host.hostname
793 fd, keyval_path = tempfile.mkstemp(suffix)
794 os.close(fd)
795 try:
796 client_keyval = os.path.join(self.client_results_dir, 'keyval')
797 try:
798 self.host.get_file(client_keyval, keyval_path)
799 finally:
800 # We will squirrel away the client side keyval
801 # away and move it back when we are done
802 remote_temp_dir = self.host.get_tmp_dir()
803 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
804 self.host.run('mv %s %s' % (client_keyval,
805 self.temp_keyval_path))
806 except (error.AutoservRunError, error.AutoservSSHTimeout):
807 print "Prepare for copying logs failed"
808 return keyval_path
809
810
811 def _process_copied_logs(self, keyval_path):
812 if not keyval_path:
813 # Client-side keyval file was copied directly
814 return
815
816 # Append contents of keyval_<host> file to keyval file
817 try:
818 # Read in new and old keyval files
819 new_keyval = utils.read_keyval(keyval_path)
820 old_keyval = utils.read_keyval(self.server_results_dir)
821 # 'Delete' from new keyval entries that are in both
822 tmp_keyval = {}
823 for key, val in new_keyval.iteritems():
824 if key not in old_keyval:
825 tmp_keyval[key] = val
826 # Append new info to keyval file
827 utils.write_keyval(self.server_results_dir, tmp_keyval)
828 # Delete keyval_<host> file
829 os.remove(keyval_path)
830 except IOError:
831 print "Process copied logs failed"
832
833
834 def _postprocess_copied_logs(self):
835 # we can now put our keyval file back
836 client_keyval = os.path.join(self.client_results_dir, 'keyval')
837 try:
838 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
839 except Exception:
840 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000841
842
mbligh57e78662008-06-17 19:53:49 +0000843# a file-like object for catching stderr from an autotest client and
844# extracting status logs from it
845class client_logger(object):
846 """Partial file object to write to both stdout and
847 the status log file. We only implement those methods
848 utils.run() actually calls.
849 """
jadmanskia1f3c202008-09-15 19:17:16 +0000850 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
851 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000852 extract_indent = re.compile(r"^(\t*).*$")
853
jadmanskia1f3c202008-09-15 19:17:16 +0000854 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000855 self.host = host
856 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000857 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000858 self.leftover = ""
859 self.last_line = ""
860 self.logs = {}
861
862
863 def _process_log_dict(self, log_dict):
864 log_list = log_dict.pop("logs", [])
865 for key in sorted(log_dict.iterkeys()):
866 log_list += self._process_log_dict(log_dict.pop(key))
867 return log_list
868
869
870 def _process_logs(self):
871 """Go through the accumulated logs in self.log and print them
872 out to stdout and the status log. Note that this processes
873 logs in an ordering where:
874
875 1) logs to different tags are never interleaved
876 2) logs to x.y come before logs to x.y.z for all z
877 3) logs to x.y come before x.z whenever y < z
878
879 Note that this will in general not be the same as the
880 chronological ordering of the logs. However, if a chronological
881 ordering is desired that one can be reconstructed from the
882 status log by looking at timestamp lines."""
883 log_list = self._process_log_dict(self.logs)
884 for line in log_list:
885 self.job._record_prerendered(line + '\n')
886 if log_list:
887 self.last_line = log_list[-1]
888
889
890 def _process_quoted_line(self, tag, line):
891 """Process a line quoted with an AUTOTEST_STATUS flag. If the
892 tag is blank then we want to push out all the data we've been
893 building up in self.logs, and then the newest line. If the
894 tag is not blank, then push the line into the logs for handling
895 later."""
896 print line
897 if tag == "":
898 self._process_logs()
899 self.job._record_prerendered(line + '\n')
900 self.last_line = line
901 else:
902 tag_parts = [int(x) for x in tag.split(".")]
903 log_dict = self.logs
904 for part in tag_parts:
905 log_dict = log_dict.setdefault(part, {})
906 log_list = log_dict.setdefault("logs", [])
907 log_list.append(line)
908
909
910 def _process_line(self, line):
911 """Write out a line of data to the appropriate stream. Status
912 lines sent by autotest will be prepended with
913 "AUTOTEST_STATUS", and all other lines are ssh error
914 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000915 status_match = self.status_parser.search(line)
916 test_complete_match = self.test_complete_parser.search(line)
917 if status_match:
918 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000919 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000920 elif test_complete_match:
921 fifo_path, = test_complete_match.groups()
922 self.log_collector.collect_client_job_results()
923 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000924 else:
925 print line
926
jadmanski4aeefe12008-06-20 20:04:25 +0000927
928 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000929 # use the indentation of whatever the last log line was
930 indent = self.extract_indent.match(last_line).group(1)
931 # if the last line starts a new group, add an extra indent
932 if last_line.lstrip('\t').startswith("START\t"):
933 indent += '\t'
934 return [self.job._render_record("WARN", None, None, msg,
935 timestamp, indent).rstrip('\n')
936 for timestamp, msg in warnings]
937
938
939 def _process_warnings(self, last_line, log_dict, warnings):
940 if log_dict.keys() in ([], ["logs"]):
941 # there are no sub-jobs, just append the warnings here
942 warnings = self._format_warnings(last_line, warnings)
943 log_list = log_dict.setdefault("logs", [])
944 log_list += warnings
945 for warning in warnings:
946 sys.stdout.write(warning + '\n')
947 else:
948 # there are sub-jobs, so put the warnings in there
949 log_list = log_dict.get("logs", [])
950 if log_list:
951 last_line = log_list[-1]
952 for key in sorted(log_dict.iterkeys()):
953 if key != "logs":
954 self._process_warnings(last_line,
955 log_dict[key],
956 warnings)
957
958
959 def write(self, data):
960 # first check for any new console warnings
961 warnings = self.job._read_warnings()
962 self._process_warnings(self.last_line, self.logs, warnings)
963 # now process the newest data written out
964 data = self.leftover + data
965 lines = data.split("\n")
966 # process every line but the last one
967 for line in lines[:-1]:
968 self._process_line(line)
969 # save the last line for later processing
970 # since we may not have the whole line yet
971 self.leftover = lines[-1]
972
973
974 def flush(self):
975 sys.stdout.flush()
976
977
978 def close(self):
979 if self.leftover:
980 self._process_line(self.leftover)
981 self._process_logs()
982 self.flush()
983
984
mblighcaa62c22008-04-07 21:51:17 +0000985# site_server_job.py may be non-existant or empty, make sure that an
986# appropriate site_server_job class is created nevertheless
987try:
jadmanski0afbb632008-06-06 21:10:57 +0000988 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000989except ImportError:
jadmanski10646442008-08-13 14:05:21 +0000990 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +0000991 pass
992
jadmanski10646442008-08-13 14:05:21 +0000993class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +0000994 pass