blob: eb24ad299d9621691542f3d2702e032f88366c88 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
jadmanski025099d2008-09-23 14:13:48 +00009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
mbligh084bc172008-10-18 14:02:45 +000010import shutil, warnings
jadmanskic09fc152008-10-15 17:56:59 +000011from autotest_lib.client.bin import fd_stack, sysinfo
mbligh09108442008-10-15 16:27:38 +000012from autotest_lib.client.common_lib import error, log, utils, packages
jadmanski10646442008-08-13 14:05:21 +000013from autotest_lib.server import test, subcommand
14from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000015
16
mbligh084bc172008-10-18 14:02:45 +000017def _control_segment_path(name):
18 """Get the pathname of the named control segment file."""
jadmanski10646442008-08-13 14:05:21 +000019 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh084bc172008-10-18 14:02:45 +000020 return os.path.join(server_dir, "control_segments", name)
jadmanski10646442008-08-13 14:05:21 +000021
22
mbligh084bc172008-10-18 14:02:45 +000023CLIENT_CONTROL_FILENAME = 'control'
24SERVER_CONTROL_FILENAME = 'control.srv'
25MACHINES_FILENAME = '.machines'
jadmanski10646442008-08-13 14:05:21 +000026
mbligh084bc172008-10-18 14:02:45 +000027CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
28CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
29CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
30REBOOT_SEGMENT_CONTROL_FILE = _control_segment_path('reboot_segment')
31INSTALL_CONTROL_FILE = _control_segment_path('install')
jadmanski10646442008-08-13 14:05:21 +000032
mbligh084bc172008-10-18 14:02:45 +000033# XXX(gps): Dealing with the site_* stuff here is annoying. The
34# control_segments/verify and control_segments/repair code should be
35# changed to load & run their site equivalents as the first thing they do.
36# Doing that would get need of the annoying 'protect=False' mechanism in
37# the _execute_code() machinery below.
38SITE_VERIFY_CONTROL_FILE = _control_segment_path('site_verify')
39VERIFY_CONTROL_FILE = _control_segment_path('verify')
40SITE_REPAIR_CONTROL_FILE = _control_segment_path('site_repair')
41REPAIR_CONTROL_FILE = _control_segment_path('repair')
jadmanski10646442008-08-13 14:05:21 +000042
43
44# load up site-specific code for generating site-specific job data
45try:
46 import site_job
47 get_site_job_data = site_job.get_site_job_data
48 del site_job
49except ImportError:
50 # by default provide a stub that generates no site data
51 def get_site_job_data(job):
52 return {}
53
54
55class base_server_job(object):
56 """The actual job against which we do everything.
57
58 Properties:
59 autodir
60 The top level autotest directory (/usr/local/autotest).
61 serverdir
62 <autodir>/server/
63 clientdir
64 <autodir>/client/
65 conmuxdir
66 <autodir>/conmux/
67 testdir
68 <autodir>/server/tests/
69 site_testdir
70 <autodir>/server/site_tests/
71 control
72 the control file for this job
73 """
74
75 STATUS_VERSION = 1
76
77
78 def __init__(self, control, args, resultdir, label, user, machines,
79 client=False, parse_job='',
80 ssh_user='root', ssh_port=22, ssh_pass=''):
81 """
82 control
83 The control file (pathname of)
84 args
85 args to pass to the control file
86 resultdir
87 where to throw the results
88 label
89 label for the job
90 user
91 Username for the job (email address)
92 client
93 True if a client-side control file
94 """
95 path = os.path.dirname(__file__)
96 self.autodir = os.path.abspath(os.path.join(path, '..'))
97 self.serverdir = os.path.join(self.autodir, 'server')
98 self.testdir = os.path.join(self.serverdir, 'tests')
99 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
100 self.tmpdir = os.path.join(self.serverdir, 'tmp')
101 self.conmuxdir = os.path.join(self.autodir, 'conmux')
102 self.clientdir = os.path.join(self.autodir, 'client')
103 self.toolsdir = os.path.join(self.autodir, 'client/tools')
104 if control:
105 self.control = open(control, 'r').read()
106 self.control = re.sub('\r', '', self.control)
107 else:
108 self.control = None
109 self.resultdir = resultdir
110 if not os.path.exists(resultdir):
111 os.mkdir(resultdir)
112 self.debugdir = os.path.join(resultdir, 'debug')
113 if not os.path.exists(self.debugdir):
114 os.mkdir(self.debugdir)
115 self.status = os.path.join(resultdir, 'status')
116 self.label = label
117 self.user = user
118 self.args = args
119 self.machines = machines
120 self.client = client
121 self.record_prefix = ''
122 self.warning_loggers = set()
123 self.ssh_user = ssh_user
124 self.ssh_port = ssh_port
125 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000126 self.run_test_cleanup = True
mbligh09108442008-10-15 16:27:38 +0000127 self.last_boot_tag = None
jadmanski10646442008-08-13 14:05:21 +0000128
129 self.stdout = fd_stack.fd_stack(1, sys.stdout)
130 self.stderr = fd_stack.fd_stack(2, sys.stderr)
131
jadmanskic09fc152008-10-15 17:56:59 +0000132 self.sysinfo = sysinfo.sysinfo(self.resultdir)
133
jadmanski025099d2008-09-23 14:13:48 +0000134 if not os.access(self.tmpdir, os.W_OK):
135 try:
136 os.makedirs(self.tmpdir, 0700)
137 except os.error, e:
138 # Thrown if the directory already exists, which it may.
139 pass
140
141 if (not os.access(self.tmpdir, os.W_OK) or
142 not os.path.isdir(self.tmpdir)):
143 self.tmpdir = os.path.join(tempfile.gettempdir(),
144 'autotest-' + getpass.getuser())
145 try:
146 os.makedirs(self.tmpdir, 0700)
147 except os.error, e:
148 # Thrown if the directory already exists, which it may.
149 # If the problem was something other than the
150 # directory already existing, this chmod should throw as well
151 # exception.
152 os.chmod(self.tmpdir, stat.S_IRWXU)
153
jadmanski10646442008-08-13 14:05:21 +0000154 if os.path.exists(self.status):
155 os.unlink(self.status)
156 job_data = {'label' : label, 'user' : user,
157 'hostname' : ','.join(machines),
158 'status_version' : str(self.STATUS_VERSION)}
159 job_data.update(get_site_job_data(self))
160 utils.write_keyval(self.resultdir, job_data)
161
162 self.parse_job = parse_job
163 if self.parse_job and len(machines) == 1:
164 self.using_parser = True
165 self.init_parser(resultdir)
166 else:
167 self.using_parser = False
168 self.pkgmgr = packages.PackageManager(
169 self.autodir, run_function_dargs={'timeout':600})
170 self.pkgdir = os.path.join(self.autodir, 'packages')
171
showard21baa452008-10-21 00:08:39 +0000172 self.num_tests_run = 0
173 self.num_tests_failed = 0
174
jadmanski10646442008-08-13 14:05:21 +0000175
176 def init_parser(self, resultdir):
177 """Start the continuous parsing of resultdir. This sets up
178 the database connection and inserts the basic job object into
179 the database if necessary."""
180 # redirect parser debugging to .parse.log
181 parse_log = os.path.join(resultdir, '.parse.log')
182 parse_log = open(parse_log, 'w', 0)
183 tko_utils.redirect_parser_debugging(parse_log)
184 # create a job model object and set up the db
185 self.results_db = tko_db.db(autocommit=True)
186 self.parser = status_lib.parser(self.STATUS_VERSION)
187 self.job_model = self.parser.make_job(resultdir)
188 self.parser.start(self.job_model)
189 # check if a job already exists in the db and insert it if
190 # it does not
191 job_idx = self.results_db.find_job(self.parse_job)
192 if job_idx is None:
193 self.results_db.insert_job(self.parse_job,
194 self.job_model)
195 else:
196 machine_idx = self.results_db.lookup_machine(
197 self.job_model.machine)
198 self.job_model.index = job_idx
199 self.job_model.machine_idx = machine_idx
200
201
202 def cleanup_parser(self):
203 """This should be called after the server job is finished
204 to carry out any remaining cleanup (e.g. flushing any
205 remaining test results to the results db)"""
206 if not self.using_parser:
207 return
208 final_tests = self.parser.end()
209 for test in final_tests:
210 self.__insert_test(test)
211 self.using_parser = False
212
213
214 def verify(self):
215 if not self.machines:
mbligh084bc172008-10-18 14:02:45 +0000216 raise error.AutoservError('No machines specified to verify')
jadmanski10646442008-08-13 14:05:21 +0000217 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000218 namespace = {'machines' : self.machines, 'job' : self,
219 'ssh_user' : self.ssh_user,
220 'ssh_port' : self.ssh_port,
221 'ssh_pass' : self.ssh_pass}
mbligh084bc172008-10-18 14:02:45 +0000222 # Protection is disabled to allow for site_verify()
223 # to be defined in site_verify but called from its
224 # "JP: deprecated" callsite in control_segments/verify.
225 if os.path.exists(SITE_VERIFY_CONTROL_FILE):
226 self._execute_code(SITE_VERIFY_CONTROL_FILE, namespace,
227 protect=False)
228 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000229 except Exception, e:
230 msg = ('Verify failed\n' + str(e) + '\n'
231 + traceback.format_exc())
232 self.record('ABORT', None, None, msg)
233 raise
234
235
236 def repair(self, host_protection):
237 if not self.machines:
238 raise error.AutoservError('No machines specified to repair')
239 namespace = {'machines': self.machines, 'job': self,
240 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
241 'ssh_pass': self.ssh_pass,
242 'protection_level': host_protection}
243 # no matter what happens during repair, go on to try to reverify
244 try:
mbligh084bc172008-10-18 14:02:45 +0000245 # Protection is disabled to allow for site_repair_full() and
246 # site_repair_filesystem_only() to be defined in site_repair but
247 # called from the "JP: deprecated" areas of control_segments/repair.
248 if os.path.exists(SITE_REPAIR_CONTROL_FILE):
249 self._execute_code(SITE_REPAIR_CONTROL_FILE, namespace,
250 protect=False)
251 self._execute_code(REPAIR_CONTROL_FILE, namespace,
252 protect=False)
jadmanski10646442008-08-13 14:05:21 +0000253 except Exception, exc:
254 print 'Exception occured during repair'
255 traceback.print_exc()
256 self.verify()
257
258
259 def precheck(self):
260 """
261 perform any additional checks in derived classes.
262 """
263 pass
264
265
266 def enable_external_logging(self):
267 """Start or restart external logging mechanism.
268 """
269 pass
270
271
272 def disable_external_logging(self):
273 """ Pause or stop external logging mechanism.
274 """
275 pass
276
277
jadmanski23afbec2008-09-17 18:12:07 +0000278 def enable_test_cleanup(self):
279 """ By default tests run test.cleanup """
280 self.run_test_cleanup = True
281
282
283 def disable_test_cleanup(self):
284 """ By default tests do not run test.cleanup """
285 self.run_test_cleanup = False
286
287
jadmanski10646442008-08-13 14:05:21 +0000288 def use_external_logging(self):
289 """Return True if external logging should be used.
290 """
291 return False
292
293
294 def parallel_simple(self, function, machines, log=True, timeout=None):
295 """Run 'function' using parallel_simple, with an extra
296 wrapper to handle the necessary setup for continuous parsing,
297 if possible. If continuous parsing is already properly
298 initialized then this should just work."""
299 is_forking = not (len(machines) == 1 and
300 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000301 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000302 def wrapper(machine):
303 self.parse_job += "/" + machine
304 self.using_parser = True
305 self.machines = [machine]
306 self.resultdir = os.path.join(self.resultdir,
307 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000308 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000309 self.init_parser(self.resultdir)
310 result = function(machine)
311 self.cleanup_parser()
312 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000313 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000314 def wrapper(machine):
315 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000316 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000317 result = function(machine)
318 return result
319 else:
320 wrapper = function
321 subcommand.parallel_simple(wrapper, machines, log, timeout)
322
323
324 def run(self, reboot = False, install_before = False,
325 install_after = False, collect_crashdumps = True,
326 namespace = {}):
327 # use a copy so changes don't affect the original dictionary
328 namespace = namespace.copy()
329 machines = self.machines
330
331 self.aborted = False
332 namespace['machines'] = machines
333 namespace['args'] = self.args
334 namespace['job'] = self
335 namespace['ssh_user'] = self.ssh_user
336 namespace['ssh_port'] = self.ssh_port
337 namespace['ssh_pass'] = self.ssh_pass
338 test_start_time = int(time.time())
339
340 os.chdir(self.resultdir)
341
342 self.enable_external_logging()
343 status_log = os.path.join(self.resultdir, 'status.log')
jadmanskicdd0c402008-09-19 21:21:31 +0000344 collect_crashinfo = True
jadmanski10646442008-08-13 14:05:21 +0000345 try:
346 if install_before and machines:
mbligh084bc172008-10-18 14:02:45 +0000347 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000348 if self.client:
349 namespace['control'] = self.control
mbligh084bc172008-10-18 14:02:45 +0000350 utils.open_write_close(CLIENT_CONTROL_FILENAME, self.control)
351 shutil.copy(CLIENT_WRAPPER_CONTROL_FILE,
352 SERVER_CONTROL_FILENAME)
jadmanski10646442008-08-13 14:05:21 +0000353 else:
mbligh084bc172008-10-18 14:02:45 +0000354 utils.open_write_close(SERVER_CONTROL_FILENAME, self.control)
355 self._execute_code(SERVER_CONTROL_FILENAME, namespace)
jadmanski10646442008-08-13 14:05:21 +0000356
jadmanskicdd0c402008-09-19 21:21:31 +0000357 # disable crashinfo collection if we get this far without error
358 collect_crashinfo = False
jadmanski10646442008-08-13 14:05:21 +0000359 finally:
jadmanskicdd0c402008-09-19 21:21:31 +0000360 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000361 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000362 if collect_crashinfo:
mbligh084bc172008-10-18 14:02:45 +0000363 # includes crashdumps
364 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
jadmanskicdd0c402008-09-19 21:21:31 +0000365 else:
mbligh084bc172008-10-18 14:02:45 +0000366 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000367 self.disable_external_logging()
368 if reboot and machines:
mbligh084bc172008-10-18 14:02:45 +0000369 self._execute_code(REBOOT_SEGMENT_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000370 if install_after and machines:
mbligh084bc172008-10-18 14:02:45 +0000371 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000372
373
374 def run_test(self, url, *args, **dargs):
375 """Summon a test object and run it.
376
377 tag
378 tag to add to testname
379 url
380 url of the test to run
381 """
382
383 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000384
385 tag = dargs.pop('tag', None)
386 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000387 testname += '.' + tag
388 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000389
390 outputdir = os.path.join(self.resultdir, subdir)
391 if os.path.exists(outputdir):
392 msg = ("%s already exists, test <%s> may have"
393 " already run with tag <%s>"
394 % (outputdir, testname, tag) )
395 raise error.TestError(msg)
396 os.mkdir(outputdir)
397
398 def group_func():
399 try:
400 test.runtest(self, url, tag, args, dargs)
401 except error.TestBaseException, e:
402 self.record(e.exit_status, subdir, testname, str(e))
403 raise
404 except Exception, e:
405 info = str(e) + "\n" + traceback.format_exc()
406 self.record('FAIL', subdir, testname, info)
407 raise
408 else:
409 self.record('GOOD', subdir, testname,
410 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000411
412 result, exc_info = self._run_group(testname, subdir, group_func)
413 if exc_info and isinstance(exc_info[1], error.TestBaseException):
414 return False
415 elif exc_info:
416 raise exc_info[0], exc_info[1], exc_info[2]
417 else:
418 return True
jadmanski10646442008-08-13 14:05:21 +0000419
420
421 def _run_group(self, name, subdir, function, *args, **dargs):
422 """\
423 Underlying method for running something inside of a group.
424 """
jadmanskide292df2008-08-26 20:51:14 +0000425 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000426 old_record_prefix = self.record_prefix
427 try:
428 self.record('START', subdir, name)
429 self.record_prefix += '\t'
430 try:
431 result = function(*args, **dargs)
432 finally:
433 self.record_prefix = old_record_prefix
434 except error.TestBaseException, e:
435 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000436 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000437 except Exception, e:
438 err_msg = str(e) + '\n'
439 err_msg += traceback.format_exc()
440 self.record('END ABORT', subdir, name, err_msg)
441 raise error.JobError(name + ' failed\n' + traceback.format_exc())
442 else:
443 self.record('END GOOD', subdir, name)
444
jadmanskide292df2008-08-26 20:51:14 +0000445 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000446
447
448 def run_group(self, function, *args, **dargs):
449 """\
450 function:
451 subroutine to run
452 *args:
453 arguments for the function
454 """
455
456 name = function.__name__
457
458 # Allow the tag for the group to be specified.
459 tag = dargs.pop('tag', None)
460 if tag:
461 name = tag
462
jadmanskide292df2008-08-26 20:51:14 +0000463 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000464
465
466 def run_reboot(self, reboot_func, get_kernel_func):
467 """\
468 A specialization of run_group meant specifically for handling
469 a reboot. Includes support for capturing the kernel version
470 after the reboot.
471
472 reboot_func: a function that carries out the reboot
473
474 get_kernel_func: a function that returns a string
475 representing the kernel version.
476 """
477
478 old_record_prefix = self.record_prefix
479 try:
480 self.record('START', None, 'reboot')
481 self.record_prefix += '\t'
482 reboot_func()
483 except Exception, e:
484 self.record_prefix = old_record_prefix
485 err_msg = str(e) + '\n' + traceback.format_exc()
486 self.record('END FAIL', None, 'reboot', err_msg)
487 else:
488 kernel = get_kernel_func()
489 self.record_prefix = old_record_prefix
490 self.record('END GOOD', None, 'reboot',
491 optional_fields={"kernel": kernel})
492
493
jadmanskic09fc152008-10-15 17:56:59 +0000494 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
495 self._add_sysinfo_loggable(sysinfo.command(command, logfile),
496 on_every_test)
497
498
499 def add_sysinfo_logfile(self, file, on_every_test=False):
500 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
501
502
503 def _add_sysinfo_loggable(self, loggable, on_every_test):
504 if on_every_test:
505 self.sysinfo.test_loggables.add(loggable)
506 else:
507 self.sysinfo.boot_loggables.add(loggable)
508
509
jadmanski10646442008-08-13 14:05:21 +0000510 def record(self, status_code, subdir, operation, status='',
511 optional_fields=None):
512 """
513 Record job-level status
514
515 The intent is to make this file both machine parseable and
516 human readable. That involves a little more complexity, but
517 really isn't all that bad ;-)
518
519 Format is <status code>\t<subdir>\t<operation>\t<status>
520
mbligh1b3b3762008-09-25 02:46:34 +0000521 status code: see common_lib.log.is_valid_status()
jadmanski10646442008-08-13 14:05:21 +0000522 for valid status definition
523
524 subdir: MUST be a relevant subdirectory in the results,
525 or None, which will be represented as '----'
526
527 operation: description of what you ran (e.g. "dbench", or
528 "mkfs -t foobar /dev/sda9")
529
530 status: error message or "completed sucessfully"
531
532 ------------------------------------------------------------
533
534 Initial tabs indicate indent levels for grouping, and is
535 governed by self.record_prefix
536
537 multiline messages have secondary lines prefaced by a double
538 space (' ')
539
540 Executing this method will trigger the logging of all new
541 warnings to date from the various console loggers.
542 """
543 # poll all our warning loggers for new warnings
544 warnings = self._read_warnings()
545 for timestamp, msg in warnings:
546 self._record("WARN", None, None, msg, timestamp)
547
548 # write out the actual status log line
549 self._record(status_code, subdir, operation, status,
550 optional_fields=optional_fields)
551
552
553 def _read_warnings(self):
554 warnings = []
555 while True:
556 # pull in a line of output from every logger that has
557 # output ready to be read
558 loggers, _, _ = select.select(self.warning_loggers,
559 [], [], 0)
560 closed_loggers = set()
561 for logger in loggers:
562 line = logger.readline()
563 # record any broken pipes (aka line == empty)
564 if len(line) == 0:
565 closed_loggers.add(logger)
566 continue
567 timestamp, msg = line.split('\t', 1)
568 warnings.append((int(timestamp), msg.strip()))
569
570 # stop listening to loggers that are closed
571 self.warning_loggers -= closed_loggers
572
573 # stop if none of the loggers have any output left
574 if not loggers:
575 break
576
577 # sort into timestamp order
578 warnings.sort()
579 return warnings
580
581
582 def _render_record(self, status_code, subdir, operation, status='',
583 epoch_time=None, record_prefix=None,
584 optional_fields=None):
585 """
586 Internal Function to generate a record to be written into a
587 status log. For use by server_job.* classes only.
588 """
589 if subdir:
590 if re.match(r'[\n\t]', subdir):
591 raise ValueError(
592 'Invalid character in subdir string')
593 substr = subdir
594 else:
595 substr = '----'
596
mbligh1b3b3762008-09-25 02:46:34 +0000597 if not log.is_valid_status(status_code):
jadmanski10646442008-08-13 14:05:21 +0000598 raise ValueError('Invalid status code supplied: %s' %
599 status_code)
600 if not operation:
601 operation = '----'
602 if re.match(r'[\n\t]', operation):
603 raise ValueError(
604 'Invalid character in operation string')
605 operation = operation.rstrip()
606 status = status.rstrip()
607 status = re.sub(r"\t", " ", status)
608 # Ensure any continuation lines are marked so we can
609 # detect them in the status file to ensure it is parsable.
610 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
611
612 if not optional_fields:
613 optional_fields = {}
614
615 # Generate timestamps for inclusion in the logs
616 if epoch_time is None:
617 epoch_time = int(time.time())
618 local_time = time.localtime(epoch_time)
619 optional_fields["timestamp"] = str(epoch_time)
620 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
621 local_time)
622
623 fields = [status_code, substr, operation]
624 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
625 fields.append(status)
626
627 if record_prefix is None:
628 record_prefix = self.record_prefix
629
630 msg = '\t'.join(str(x) for x in fields)
631
632 return record_prefix + msg + '\n'
633
634
635 def _record_prerendered(self, msg):
636 """
637 Record a pre-rendered msg into the status logs. The only
638 change this makes to the message is to add on the local
639 indentation. Should not be called outside of server_job.*
640 classes. Unlike _record, this does not write the message
641 to standard output.
642 """
643 lines = []
644 status_file = os.path.join(self.resultdir, 'status.log')
645 status_log = open(status_file, 'a')
646 for line in msg.splitlines():
647 line = self.record_prefix + line + '\n'
648 lines.append(line)
649 status_log.write(line)
650 status_log.close()
651 self.__parse_status(lines)
652
653
mbligh084bc172008-10-18 14:02:45 +0000654 def _fill_server_control_namespace(self, namespace, protect=True):
655 """Prepare a namespace to be used when executing server control files.
656
657 This sets up the control file API by importing modules and making them
658 available under the appropriate names within namespace.
659
660 For use by _execute_code().
661
662 Args:
663 namespace: The namespace dictionary to fill in.
664 protect: Boolean. If True (the default) any operation that would
665 clobber an existing entry in namespace will cause an error.
666 Raises:
667 error.AutoservError: When a name would be clobbered by import.
668 """
669 def _import_names(module_name, names=()):
670 """Import a module and assign named attributes into namespace.
671
672 Args:
673 module_name: The string module name.
674 names: A limiting list of names to import from module_name. If
675 empty (the default), all names are imported from the module
676 similar to a "from foo.bar import *" statement.
677 Raises:
678 error.AutoservError: When a name being imported would clobber
679 a name already in namespace.
680 """
681 module = __import__(module_name, {}, {}, names)
682
683 # No names supplied? Import * from the lowest level module.
684 # (Ugh, why do I have to implement this part myself?)
685 if not names:
686 for submodule_name in module_name.split('.')[1:]:
687 module = getattr(module, submodule_name)
688 if hasattr(module, '__all__'):
689 names = getattr(module, '__all__')
690 else:
691 names = dir(module)
692
693 # Install each name into namespace, checking to make sure it
694 # doesn't override anything that already exists.
695 for name in names:
696 # Check for conflicts to help prevent future problems.
697 if name in namespace and protect:
698 if namespace[name] is not getattr(module, name):
699 raise error.AutoservError('importing name '
700 '%s from %s %r would override %r' %
701 (name, module_name, getattr(module, name),
702 namespace[name]))
703 else:
704 # Encourage cleanliness and the use of __all__ for a
705 # more concrete API with less surprises on '*' imports.
706 warnings.warn('%s (%r) being imported from %s for use '
707 'in server control files is not the '
708 'first occurrance of that import.' %
709 (name, namespace[name], module_name))
710
711 namespace[name] = getattr(module, name)
712
713
714 # This is the equivalent of prepending a bunch of import statements to
715 # the front of the control script.
716 namespace.update(os=os, sys=sys)
717 _import_names('autotest_lib.server',
718 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
719 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
720 _import_names('autotest_lib.server.subcommand',
721 ('parallel', 'parallel_simple', 'subcommand'))
722 _import_names('autotest_lib.server.utils',
723 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
724 _import_names('autotest_lib.client.common_lib.error')
725 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
726
727 # Inject ourself as the job object into other classes within the API.
728 # (Yuck, this injection is a gross thing be part of a public API. -gps)
729 #
730 # XXX Base & SiteAutotest do not appear to use .job. Who does?
731 namespace['autotest'].Autotest.job = self
732 # server.hosts.base_classes.Host uses .job.
733 namespace['hosts'].Host.job = self
734
735
736 def _execute_code(self, code_file, namespace, protect=True):
737 """Execute code using a copy of namespace as a server control script.
738
739 Unless protect_namespace is explicitly set to False, the dict will not
740 be modified.
741
742 Args:
743 code_file: The filename of the control file to execute.
744 namespace: A dict containing names to make available during execution.
745 protect: Boolean. If True (the default) a copy of the namespace dict
746 is used during execution to prevent the code from modifying its
747 contents outside of this function. If False the raw dict is
748 passed in and modifications will be allowed.
749 """
750 if protect:
751 namespace = namespace.copy()
752 self._fill_server_control_namespace(namespace, protect=protect)
753 # TODO: Simplify and get rid of the special cases for only 1 machine.
754 if len(self.machines):
755 machines_text = '\n'.join(self.machines) + '\n'
756 # Only rewrite the file if it does not match our machine list.
757 try:
758 machines_f = open(MACHINES_FILENAME, 'r')
759 existing_machines_text = machines_f.read()
760 machines_f.close()
761 except EnvironmentError:
762 existing_machines_text = None
763 if machines_text != existing_machines_text:
764 utils.open_write_close(MACHINES_FILENAME, machines_text)
765 execfile(code_file, namespace, namespace)
jadmanski10646442008-08-13 14:05:21 +0000766
767
768 def _record(self, status_code, subdir, operation, status='',
769 epoch_time=None, optional_fields=None):
770 """
771 Actual function for recording a single line into the status
772 logs. Should never be called directly, only by job.record as
773 this would bypass the console monitor logging.
774 """
775
776 msg = self._render_record(status_code, subdir, operation,
777 status, epoch_time,
778 optional_fields=optional_fields)
779
780
781 status_file = os.path.join(self.resultdir, 'status.log')
782 sys.stdout.write(msg)
783 open(status_file, "a").write(msg)
784 if subdir:
785 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000786 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000787 open(status_file, "a").write(msg)
788 self.__parse_status(msg.splitlines())
789
790
791 def __parse_status(self, new_lines):
792 if not self.using_parser:
793 return
794 new_tests = self.parser.process_lines(new_lines)
795 for test in new_tests:
796 self.__insert_test(test)
797
798
799 def __insert_test(self, test):
800 """ An internal method to insert a new test result into the
801 database. This method will not raise an exception, even if an
802 error occurs during the insert, to avoid failing a test
803 simply because of unexpected database issues."""
showard21baa452008-10-21 00:08:39 +0000804 self.num_tests_run += 1
805 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
806 self.num_tests_failed += 1
jadmanski10646442008-08-13 14:05:21 +0000807 try:
808 self.results_db.insert_test(self.job_model, test)
809 except Exception:
810 msg = ("WARNING: An unexpected error occured while "
811 "inserting test results into the database. "
812 "Ignoring error.\n" + traceback.format_exc())
813 print >> sys.stderr, msg
814
mblighcaa62c22008-04-07 21:51:17 +0000815
jadmanskib6eb2f12008-09-12 16:39:36 +0000816
jadmanskia1f3c202008-09-15 19:17:16 +0000817class log_collector(object):
818 def __init__(self, host, client_tag, results_dir):
819 self.host = host
820 if not client_tag:
821 client_tag = "default"
822 self.client_results_dir = os.path.join(host.get_autodir(), "results",
823 client_tag)
824 self.server_results_dir = results_dir
825
826
jadmanskib6eb2f12008-09-12 16:39:36 +0000827 def collect_client_job_results(self):
828 """ A method that collects all the current results of a running
829 client job into the results dir. By default does nothing as no
830 client job is running, but when running a client job you can override
831 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000832
833 # make an effort to wait for the machine to come up
834 try:
835 self.host.wait_up(timeout=30)
836 except error.AutoservError:
837 # don't worry about any errors, we'll try and
838 # get the results anyway
839 pass
840
841
842 # Copy all dirs in default to results_dir
jadmanskibfb32f82008-10-03 23:13:36 +0000843 try:
844 keyval_path = self._prepare_for_copying_logs()
845 self.host.get_file(self.client_results_dir + '/',
846 self.server_results_dir)
847 self._process_copied_logs(keyval_path)
848 self._postprocess_copied_logs()
849 except Exception:
850 # well, don't stop running just because we couldn't get logs
851 print "Unexpected error copying test result logs, continuing ..."
852 traceback.print_exc(file=sys.stdout)
jadmanskia1f3c202008-09-15 19:17:16 +0000853
854
855 def _prepare_for_copying_logs(self):
856 server_keyval = os.path.join(self.server_results_dir, 'keyval')
857 if not os.path.exists(server_keyval):
858 # Client-side keyval file can be copied directly
859 return
860
861 # Copy client-side keyval to temporary location
862 suffix = '.keyval_%s' % self.host.hostname
863 fd, keyval_path = tempfile.mkstemp(suffix)
864 os.close(fd)
865 try:
866 client_keyval = os.path.join(self.client_results_dir, 'keyval')
867 try:
868 self.host.get_file(client_keyval, keyval_path)
869 finally:
870 # We will squirrel away the client side keyval
871 # away and move it back when we are done
872 remote_temp_dir = self.host.get_tmp_dir()
873 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
874 self.host.run('mv %s %s' % (client_keyval,
875 self.temp_keyval_path))
876 except (error.AutoservRunError, error.AutoservSSHTimeout):
877 print "Prepare for copying logs failed"
878 return keyval_path
879
880
881 def _process_copied_logs(self, keyval_path):
882 if not keyval_path:
883 # Client-side keyval file was copied directly
884 return
885
886 # Append contents of keyval_<host> file to keyval file
887 try:
888 # Read in new and old keyval files
889 new_keyval = utils.read_keyval(keyval_path)
890 old_keyval = utils.read_keyval(self.server_results_dir)
891 # 'Delete' from new keyval entries that are in both
892 tmp_keyval = {}
893 for key, val in new_keyval.iteritems():
894 if key not in old_keyval:
895 tmp_keyval[key] = val
896 # Append new info to keyval file
897 utils.write_keyval(self.server_results_dir, tmp_keyval)
898 # Delete keyval_<host> file
899 os.remove(keyval_path)
900 except IOError:
901 print "Process copied logs failed"
902
903
904 def _postprocess_copied_logs(self):
905 # we can now put our keyval file back
906 client_keyval = os.path.join(self.client_results_dir, 'keyval')
907 try:
908 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
909 except Exception:
910 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000911
912
mbligh57e78662008-06-17 19:53:49 +0000913# a file-like object for catching stderr from an autotest client and
914# extracting status logs from it
915class client_logger(object):
916 """Partial file object to write to both stdout and
917 the status log file. We only implement those methods
918 utils.run() actually calls.
919 """
jadmanskia1f3c202008-09-15 19:17:16 +0000920 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
921 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000922 extract_indent = re.compile(r"^(\t*).*$")
923
jadmanskia1f3c202008-09-15 19:17:16 +0000924 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000925 self.host = host
926 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000927 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000928 self.leftover = ""
929 self.last_line = ""
930 self.logs = {}
931
932
933 def _process_log_dict(self, log_dict):
934 log_list = log_dict.pop("logs", [])
935 for key in sorted(log_dict.iterkeys()):
936 log_list += self._process_log_dict(log_dict.pop(key))
937 return log_list
938
939
940 def _process_logs(self):
941 """Go through the accumulated logs in self.log and print them
942 out to stdout and the status log. Note that this processes
943 logs in an ordering where:
944
945 1) logs to different tags are never interleaved
946 2) logs to x.y come before logs to x.y.z for all z
947 3) logs to x.y come before x.z whenever y < z
948
949 Note that this will in general not be the same as the
950 chronological ordering of the logs. However, if a chronological
951 ordering is desired that one can be reconstructed from the
952 status log by looking at timestamp lines."""
953 log_list = self._process_log_dict(self.logs)
954 for line in log_list:
955 self.job._record_prerendered(line + '\n')
956 if log_list:
957 self.last_line = log_list[-1]
958
959
960 def _process_quoted_line(self, tag, line):
961 """Process a line quoted with an AUTOTEST_STATUS flag. If the
962 tag is blank then we want to push out all the data we've been
963 building up in self.logs, and then the newest line. If the
964 tag is not blank, then push the line into the logs for handling
965 later."""
966 print line
967 if tag == "":
968 self._process_logs()
969 self.job._record_prerendered(line + '\n')
970 self.last_line = line
971 else:
972 tag_parts = [int(x) for x in tag.split(".")]
973 log_dict = self.logs
974 for part in tag_parts:
975 log_dict = log_dict.setdefault(part, {})
976 log_list = log_dict.setdefault("logs", [])
977 log_list.append(line)
978
979
980 def _process_line(self, line):
981 """Write out a line of data to the appropriate stream. Status
982 lines sent by autotest will be prepended with
983 "AUTOTEST_STATUS", and all other lines are ssh error
984 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000985 status_match = self.status_parser.search(line)
986 test_complete_match = self.test_complete_parser.search(line)
987 if status_match:
988 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000989 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000990 elif test_complete_match:
991 fifo_path, = test_complete_match.groups()
992 self.log_collector.collect_client_job_results()
993 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000994 else:
995 print line
996
jadmanski4aeefe12008-06-20 20:04:25 +0000997
998 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000999 # use the indentation of whatever the last log line was
1000 indent = self.extract_indent.match(last_line).group(1)
1001 # if the last line starts a new group, add an extra indent
1002 if last_line.lstrip('\t').startswith("START\t"):
1003 indent += '\t'
1004 return [self.job._render_record("WARN", None, None, msg,
1005 timestamp, indent).rstrip('\n')
1006 for timestamp, msg in warnings]
1007
1008
1009 def _process_warnings(self, last_line, log_dict, warnings):
1010 if log_dict.keys() in ([], ["logs"]):
1011 # there are no sub-jobs, just append the warnings here
1012 warnings = self._format_warnings(last_line, warnings)
1013 log_list = log_dict.setdefault("logs", [])
1014 log_list += warnings
1015 for warning in warnings:
1016 sys.stdout.write(warning + '\n')
1017 else:
1018 # there are sub-jobs, so put the warnings in there
1019 log_list = log_dict.get("logs", [])
1020 if log_list:
1021 last_line = log_list[-1]
1022 for key in sorted(log_dict.iterkeys()):
1023 if key != "logs":
1024 self._process_warnings(last_line,
1025 log_dict[key],
1026 warnings)
1027
1028
1029 def write(self, data):
1030 # first check for any new console warnings
1031 warnings = self.job._read_warnings()
1032 self._process_warnings(self.last_line, self.logs, warnings)
1033 # now process the newest data written out
1034 data = self.leftover + data
1035 lines = data.split("\n")
1036 # process every line but the last one
1037 for line in lines[:-1]:
1038 self._process_line(line)
1039 # save the last line for later processing
1040 # since we may not have the whole line yet
1041 self.leftover = lines[-1]
1042
1043
1044 def flush(self):
1045 sys.stdout.flush()
1046
1047
1048 def close(self):
1049 if self.leftover:
1050 self._process_line(self.leftover)
1051 self._process_logs()
1052 self.flush()
1053
1054
mblighcaa62c22008-04-07 21:51:17 +00001055# site_server_job.py may be non-existant or empty, make sure that an
1056# appropriate site_server_job class is created nevertheless
1057try:
jadmanski0afbb632008-06-06 21:10:57 +00001058 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +00001059except ImportError:
jadmanski10646442008-08-13 14:05:21 +00001060 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +00001061 pass
1062
jadmanski10646442008-08-13 14:05:21 +00001063class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +00001064 pass