blob: f4289143760f9ec27beec74497a1054242e29dea [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
jadmanski025099d2008-09-23 14:13:48 +00009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
mbligh084bc172008-10-18 14:02:45 +000010import shutil, warnings
jadmanskic09fc152008-10-15 17:56:59 +000011from autotest_lib.client.bin import fd_stack, sysinfo
mbligh09108442008-10-15 16:27:38 +000012from autotest_lib.client.common_lib import error, log, utils, packages
jadmanski10646442008-08-13 14:05:21 +000013from autotest_lib.server import test, subcommand
14from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000015
16
mbligh084bc172008-10-18 14:02:45 +000017def _control_segment_path(name):
18 """Get the pathname of the named control segment file."""
jadmanski10646442008-08-13 14:05:21 +000019 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh084bc172008-10-18 14:02:45 +000020 return os.path.join(server_dir, "control_segments", name)
jadmanski10646442008-08-13 14:05:21 +000021
22
mbligh084bc172008-10-18 14:02:45 +000023CLIENT_CONTROL_FILENAME = 'control'
24SERVER_CONTROL_FILENAME = 'control.srv'
25MACHINES_FILENAME = '.machines'
jadmanski10646442008-08-13 14:05:21 +000026
mbligh084bc172008-10-18 14:02:45 +000027CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
28CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
29CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
mbligh084bc172008-10-18 14:02:45 +000030INSTALL_CONTROL_FILE = _control_segment_path('install')
showard45ae8192008-11-05 19:32:53 +000031CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
jadmanski10646442008-08-13 14:05:21 +000032
mbligh084bc172008-10-18 14:02:45 +000033VERIFY_CONTROL_FILE = _control_segment_path('verify')
mbligh084bc172008-10-18 14:02:45 +000034REPAIR_CONTROL_FILE = _control_segment_path('repair')
jadmanski10646442008-08-13 14:05:21 +000035
36
37# load up site-specific code for generating site-specific job data
38try:
39 import site_job
40 get_site_job_data = site_job.get_site_job_data
41 del site_job
42except ImportError:
43 # by default provide a stub that generates no site data
44 def get_site_job_data(job):
45 return {}
46
47
48class base_server_job(object):
49 """The actual job against which we do everything.
50
51 Properties:
52 autodir
53 The top level autotest directory (/usr/local/autotest).
54 serverdir
55 <autodir>/server/
56 clientdir
57 <autodir>/client/
58 conmuxdir
59 <autodir>/conmux/
60 testdir
61 <autodir>/server/tests/
62 site_testdir
63 <autodir>/server/site_tests/
64 control
65 the control file for this job
66 """
67
68 STATUS_VERSION = 1
69
70
71 def __init__(self, control, args, resultdir, label, user, machines,
72 client=False, parse_job='',
73 ssh_user='root', ssh_port=22, ssh_pass=''):
74 """
75 control
76 The control file (pathname of)
77 args
78 args to pass to the control file
79 resultdir
80 where to throw the results
81 label
82 label for the job
83 user
84 Username for the job (email address)
85 client
86 True if a client-side control file
87 """
88 path = os.path.dirname(__file__)
89 self.autodir = os.path.abspath(os.path.join(path, '..'))
90 self.serverdir = os.path.join(self.autodir, 'server')
91 self.testdir = os.path.join(self.serverdir, 'tests')
92 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
93 self.tmpdir = os.path.join(self.serverdir, 'tmp')
94 self.conmuxdir = os.path.join(self.autodir, 'conmux')
95 self.clientdir = os.path.join(self.autodir, 'client')
96 self.toolsdir = os.path.join(self.autodir, 'client/tools')
97 if control:
98 self.control = open(control, 'r').read()
99 self.control = re.sub('\r', '', self.control)
100 else:
showard45ae8192008-11-05 19:32:53 +0000101 self.control = ''
jadmanski10646442008-08-13 14:05:21 +0000102 self.resultdir = resultdir
103 if not os.path.exists(resultdir):
104 os.mkdir(resultdir)
105 self.debugdir = os.path.join(resultdir, 'debug')
106 if not os.path.exists(self.debugdir):
107 os.mkdir(self.debugdir)
108 self.status = os.path.join(resultdir, 'status')
109 self.label = label
110 self.user = user
111 self.args = args
112 self.machines = machines
113 self.client = client
114 self.record_prefix = ''
115 self.warning_loggers = set()
116 self.ssh_user = ssh_user
117 self.ssh_port = ssh_port
118 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000119 self.run_test_cleanup = True
mbligh09108442008-10-15 16:27:38 +0000120 self.last_boot_tag = None
jadmanski53aaf382008-11-17 16:22:31 +0000121 self.hosts = set()
jadmanski10646442008-08-13 14:05:21 +0000122
123 self.stdout = fd_stack.fd_stack(1, sys.stdout)
124 self.stderr = fd_stack.fd_stack(2, sys.stderr)
125
jadmanskic09fc152008-10-15 17:56:59 +0000126 self.sysinfo = sysinfo.sysinfo(self.resultdir)
127
jadmanski025099d2008-09-23 14:13:48 +0000128 if not os.access(self.tmpdir, os.W_OK):
129 try:
130 os.makedirs(self.tmpdir, 0700)
131 except os.error, e:
132 # Thrown if the directory already exists, which it may.
133 pass
134
135 if (not os.access(self.tmpdir, os.W_OK) or
136 not os.path.isdir(self.tmpdir)):
137 self.tmpdir = os.path.join(tempfile.gettempdir(),
138 'autotest-' + getpass.getuser())
139 try:
140 os.makedirs(self.tmpdir, 0700)
141 except os.error, e:
142 # Thrown if the directory already exists, which it may.
143 # If the problem was something other than the
144 # directory already existing, this chmod should throw as well
145 # exception.
146 os.chmod(self.tmpdir, stat.S_IRWXU)
147
jadmanski10646442008-08-13 14:05:21 +0000148 if os.path.exists(self.status):
149 os.unlink(self.status)
150 job_data = {'label' : label, 'user' : user,
151 'hostname' : ','.join(machines),
152 'status_version' : str(self.STATUS_VERSION)}
153 job_data.update(get_site_job_data(self))
154 utils.write_keyval(self.resultdir, job_data)
155
156 self.parse_job = parse_job
157 if self.parse_job and len(machines) == 1:
158 self.using_parser = True
159 self.init_parser(resultdir)
160 else:
161 self.using_parser = False
162 self.pkgmgr = packages.PackageManager(
163 self.autodir, run_function_dargs={'timeout':600})
164 self.pkgdir = os.path.join(self.autodir, 'packages')
165
showard21baa452008-10-21 00:08:39 +0000166 self.num_tests_run = 0
167 self.num_tests_failed = 0
168
jadmanski10646442008-08-13 14:05:21 +0000169
170 def init_parser(self, resultdir):
171 """Start the continuous parsing of resultdir. This sets up
172 the database connection and inserts the basic job object into
173 the database if necessary."""
174 # redirect parser debugging to .parse.log
175 parse_log = os.path.join(resultdir, '.parse.log')
176 parse_log = open(parse_log, 'w', 0)
177 tko_utils.redirect_parser_debugging(parse_log)
178 # create a job model object and set up the db
179 self.results_db = tko_db.db(autocommit=True)
180 self.parser = status_lib.parser(self.STATUS_VERSION)
181 self.job_model = self.parser.make_job(resultdir)
182 self.parser.start(self.job_model)
183 # check if a job already exists in the db and insert it if
184 # it does not
185 job_idx = self.results_db.find_job(self.parse_job)
186 if job_idx is None:
187 self.results_db.insert_job(self.parse_job,
188 self.job_model)
189 else:
190 machine_idx = self.results_db.lookup_machine(
191 self.job_model.machine)
192 self.job_model.index = job_idx
193 self.job_model.machine_idx = machine_idx
194
195
196 def cleanup_parser(self):
197 """This should be called after the server job is finished
198 to carry out any remaining cleanup (e.g. flushing any
199 remaining test results to the results db)"""
200 if not self.using_parser:
201 return
202 final_tests = self.parser.end()
203 for test in final_tests:
204 self.__insert_test(test)
205 self.using_parser = False
206
207
208 def verify(self):
209 if not self.machines:
mbligh084bc172008-10-18 14:02:45 +0000210 raise error.AutoservError('No machines specified to verify')
jadmanski10646442008-08-13 14:05:21 +0000211 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000212 namespace = {'machines' : self.machines, 'job' : self,
213 'ssh_user' : self.ssh_user,
214 'ssh_port' : self.ssh_port,
215 'ssh_pass' : self.ssh_pass}
mbligh084bc172008-10-18 14:02:45 +0000216 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000217 except Exception, e:
218 msg = ('Verify failed\n' + str(e) + '\n'
219 + traceback.format_exc())
220 self.record('ABORT', None, None, msg)
221 raise
222
223
224 def repair(self, host_protection):
225 if not self.machines:
226 raise error.AutoservError('No machines specified to repair')
227 namespace = {'machines': self.machines, 'job': self,
228 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
229 'ssh_pass': self.ssh_pass,
230 'protection_level': host_protection}
231 # no matter what happens during repair, go on to try to reverify
232 try:
mbligh084bc172008-10-18 14:02:45 +0000233 self._execute_code(REPAIR_CONTROL_FILE, namespace,
234 protect=False)
jadmanski10646442008-08-13 14:05:21 +0000235 except Exception, exc:
236 print 'Exception occured during repair'
237 traceback.print_exc()
238 self.verify()
239
240
241 def precheck(self):
242 """
243 perform any additional checks in derived classes.
244 """
245 pass
246
247
248 def enable_external_logging(self):
249 """Start or restart external logging mechanism.
250 """
251 pass
252
253
254 def disable_external_logging(self):
255 """ Pause or stop external logging mechanism.
256 """
257 pass
258
259
jadmanski23afbec2008-09-17 18:12:07 +0000260 def enable_test_cleanup(self):
261 """ By default tests run test.cleanup """
262 self.run_test_cleanup = True
263
264
265 def disable_test_cleanup(self):
266 """ By default tests do not run test.cleanup """
267 self.run_test_cleanup = False
268
269
jadmanski10646442008-08-13 14:05:21 +0000270 def use_external_logging(self):
271 """Return True if external logging should be used.
272 """
273 return False
274
275
276 def parallel_simple(self, function, machines, log=True, timeout=None):
277 """Run 'function' using parallel_simple, with an extra
278 wrapper to handle the necessary setup for continuous parsing,
279 if possible. If continuous parsing is already properly
280 initialized then this should just work."""
281 is_forking = not (len(machines) == 1 and
282 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000283 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000284 def wrapper(machine):
285 self.parse_job += "/" + machine
286 self.using_parser = True
287 self.machines = [machine]
288 self.resultdir = os.path.join(self.resultdir,
289 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000290 os.chdir(self.resultdir)
showard2bab8f42008-11-12 18:15:22 +0000291 utils.write_keyval(self.resultdir, {"hostname": machine})
jadmanski10646442008-08-13 14:05:21 +0000292 self.init_parser(self.resultdir)
293 result = function(machine)
294 self.cleanup_parser()
295 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000296 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000297 def wrapper(machine):
298 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000299 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000300 result = function(machine)
301 return result
302 else:
303 wrapper = function
304 subcommand.parallel_simple(wrapper, machines, log, timeout)
305
306
showard45ae8192008-11-05 19:32:53 +0000307 def run(self, cleanup = False, install_before = False,
jadmanski10646442008-08-13 14:05:21 +0000308 install_after = False, collect_crashdumps = True,
309 namespace = {}):
310 # use a copy so changes don't affect the original dictionary
311 namespace = namespace.copy()
312 machines = self.machines
313
314 self.aborted = False
315 namespace['machines'] = machines
316 namespace['args'] = self.args
317 namespace['job'] = self
318 namespace['ssh_user'] = self.ssh_user
319 namespace['ssh_port'] = self.ssh_port
320 namespace['ssh_pass'] = self.ssh_pass
321 test_start_time = int(time.time())
322
323 os.chdir(self.resultdir)
324
325 self.enable_external_logging()
326 status_log = os.path.join(self.resultdir, 'status.log')
jadmanskicdd0c402008-09-19 21:21:31 +0000327 collect_crashinfo = True
jadmanski10646442008-08-13 14:05:21 +0000328 try:
329 if install_before and machines:
mbligh084bc172008-10-18 14:02:45 +0000330 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000331 if self.client:
332 namespace['control'] = self.control
mbligh084bc172008-10-18 14:02:45 +0000333 utils.open_write_close(CLIENT_CONTROL_FILENAME, self.control)
334 shutil.copy(CLIENT_WRAPPER_CONTROL_FILE,
335 SERVER_CONTROL_FILENAME)
jadmanski10646442008-08-13 14:05:21 +0000336 else:
mbligh084bc172008-10-18 14:02:45 +0000337 utils.open_write_close(SERVER_CONTROL_FILENAME, self.control)
338 self._execute_code(SERVER_CONTROL_FILENAME, namespace)
jadmanski10646442008-08-13 14:05:21 +0000339
jadmanskicdd0c402008-09-19 21:21:31 +0000340 # disable crashinfo collection if we get this far without error
341 collect_crashinfo = False
jadmanski10646442008-08-13 14:05:21 +0000342 finally:
jadmanskicdd0c402008-09-19 21:21:31 +0000343 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000344 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000345 if collect_crashinfo:
mbligh084bc172008-10-18 14:02:45 +0000346 # includes crashdumps
347 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
jadmanskicdd0c402008-09-19 21:21:31 +0000348 else:
mbligh084bc172008-10-18 14:02:45 +0000349 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000350 self.disable_external_logging()
showard45ae8192008-11-05 19:32:53 +0000351 if cleanup and machines:
352 self._execute_code(CLEANUP_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000353 if install_after and machines:
mbligh084bc172008-10-18 14:02:45 +0000354 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000355
356
357 def run_test(self, url, *args, **dargs):
358 """Summon a test object and run it.
359
360 tag
361 tag to add to testname
362 url
363 url of the test to run
364 """
365
366 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000367
368 tag = dargs.pop('tag', None)
369 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000370 testname += '.' + tag
371 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000372
373 outputdir = os.path.join(self.resultdir, subdir)
374 if os.path.exists(outputdir):
375 msg = ("%s already exists, test <%s> may have"
376 " already run with tag <%s>"
377 % (outputdir, testname, tag) )
378 raise error.TestError(msg)
379 os.mkdir(outputdir)
380
381 def group_func():
382 try:
383 test.runtest(self, url, tag, args, dargs)
384 except error.TestBaseException, e:
385 self.record(e.exit_status, subdir, testname, str(e))
386 raise
387 except Exception, e:
388 info = str(e) + "\n" + traceback.format_exc()
389 self.record('FAIL', subdir, testname, info)
390 raise
391 else:
392 self.record('GOOD', subdir, testname,
393 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000394
395 result, exc_info = self._run_group(testname, subdir, group_func)
396 if exc_info and isinstance(exc_info[1], error.TestBaseException):
397 return False
398 elif exc_info:
399 raise exc_info[0], exc_info[1], exc_info[2]
400 else:
401 return True
jadmanski10646442008-08-13 14:05:21 +0000402
403
404 def _run_group(self, name, subdir, function, *args, **dargs):
405 """\
406 Underlying method for running something inside of a group.
407 """
jadmanskide292df2008-08-26 20:51:14 +0000408 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000409 old_record_prefix = self.record_prefix
410 try:
411 self.record('START', subdir, name)
412 self.record_prefix += '\t'
413 try:
414 result = function(*args, **dargs)
415 finally:
416 self.record_prefix = old_record_prefix
417 except error.TestBaseException, e:
418 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000419 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000420 except Exception, e:
421 err_msg = str(e) + '\n'
422 err_msg += traceback.format_exc()
423 self.record('END ABORT', subdir, name, err_msg)
424 raise error.JobError(name + ' failed\n' + traceback.format_exc())
425 else:
426 self.record('END GOOD', subdir, name)
427
jadmanskide292df2008-08-26 20:51:14 +0000428 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000429
430
431 def run_group(self, function, *args, **dargs):
432 """\
433 function:
434 subroutine to run
435 *args:
436 arguments for the function
437 """
438
439 name = function.__name__
440
441 # Allow the tag for the group to be specified.
442 tag = dargs.pop('tag', None)
443 if tag:
444 name = tag
445
jadmanskide292df2008-08-26 20:51:14 +0000446 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000447
448
449 def run_reboot(self, reboot_func, get_kernel_func):
450 """\
451 A specialization of run_group meant specifically for handling
452 a reboot. Includes support for capturing the kernel version
453 after the reboot.
454
455 reboot_func: a function that carries out the reboot
456
457 get_kernel_func: a function that returns a string
458 representing the kernel version.
459 """
460
461 old_record_prefix = self.record_prefix
462 try:
463 self.record('START', None, 'reboot')
464 self.record_prefix += '\t'
465 reboot_func()
466 except Exception, e:
467 self.record_prefix = old_record_prefix
468 err_msg = str(e) + '\n' + traceback.format_exc()
469 self.record('END FAIL', None, 'reboot', err_msg)
470 else:
471 kernel = get_kernel_func()
472 self.record_prefix = old_record_prefix
473 self.record('END GOOD', None, 'reboot',
474 optional_fields={"kernel": kernel})
475
476
jadmanskic09fc152008-10-15 17:56:59 +0000477 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
478 self._add_sysinfo_loggable(sysinfo.command(command, logfile),
479 on_every_test)
480
481
482 def add_sysinfo_logfile(self, file, on_every_test=False):
483 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
484
485
486 def _add_sysinfo_loggable(self, loggable, on_every_test):
487 if on_every_test:
488 self.sysinfo.test_loggables.add(loggable)
489 else:
490 self.sysinfo.boot_loggables.add(loggable)
491
492
jadmanski10646442008-08-13 14:05:21 +0000493 def record(self, status_code, subdir, operation, status='',
494 optional_fields=None):
495 """
496 Record job-level status
497
498 The intent is to make this file both machine parseable and
499 human readable. That involves a little more complexity, but
500 really isn't all that bad ;-)
501
502 Format is <status code>\t<subdir>\t<operation>\t<status>
503
mbligh1b3b3762008-09-25 02:46:34 +0000504 status code: see common_lib.log.is_valid_status()
jadmanski10646442008-08-13 14:05:21 +0000505 for valid status definition
506
507 subdir: MUST be a relevant subdirectory in the results,
508 or None, which will be represented as '----'
509
510 operation: description of what you ran (e.g. "dbench", or
511 "mkfs -t foobar /dev/sda9")
512
513 status: error message or "completed sucessfully"
514
515 ------------------------------------------------------------
516
517 Initial tabs indicate indent levels for grouping, and is
518 governed by self.record_prefix
519
520 multiline messages have secondary lines prefaced by a double
521 space (' ')
522
523 Executing this method will trigger the logging of all new
524 warnings to date from the various console loggers.
525 """
526 # poll all our warning loggers for new warnings
527 warnings = self._read_warnings()
528 for timestamp, msg in warnings:
529 self._record("WARN", None, None, msg, timestamp)
530
531 # write out the actual status log line
532 self._record(status_code, subdir, operation, status,
533 optional_fields=optional_fields)
534
535
536 def _read_warnings(self):
537 warnings = []
538 while True:
539 # pull in a line of output from every logger that has
540 # output ready to be read
541 loggers, _, _ = select.select(self.warning_loggers,
542 [], [], 0)
543 closed_loggers = set()
544 for logger in loggers:
545 line = logger.readline()
546 # record any broken pipes (aka line == empty)
547 if len(line) == 0:
548 closed_loggers.add(logger)
549 continue
550 timestamp, msg = line.split('\t', 1)
551 warnings.append((int(timestamp), msg.strip()))
552
553 # stop listening to loggers that are closed
554 self.warning_loggers -= closed_loggers
555
556 # stop if none of the loggers have any output left
557 if not loggers:
558 break
559
560 # sort into timestamp order
561 warnings.sort()
562 return warnings
563
564
565 def _render_record(self, status_code, subdir, operation, status='',
566 epoch_time=None, record_prefix=None,
567 optional_fields=None):
568 """
569 Internal Function to generate a record to be written into a
570 status log. For use by server_job.* classes only.
571 """
572 if subdir:
573 if re.match(r'[\n\t]', subdir):
574 raise ValueError(
575 'Invalid character in subdir string')
576 substr = subdir
577 else:
578 substr = '----'
579
mbligh1b3b3762008-09-25 02:46:34 +0000580 if not log.is_valid_status(status_code):
jadmanski10646442008-08-13 14:05:21 +0000581 raise ValueError('Invalid status code supplied: %s' %
582 status_code)
583 if not operation:
584 operation = '----'
585 if re.match(r'[\n\t]', operation):
586 raise ValueError(
587 'Invalid character in operation string')
588 operation = operation.rstrip()
589 status = status.rstrip()
590 status = re.sub(r"\t", " ", status)
591 # Ensure any continuation lines are marked so we can
592 # detect them in the status file to ensure it is parsable.
593 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
594
595 if not optional_fields:
596 optional_fields = {}
597
598 # Generate timestamps for inclusion in the logs
599 if epoch_time is None:
600 epoch_time = int(time.time())
601 local_time = time.localtime(epoch_time)
602 optional_fields["timestamp"] = str(epoch_time)
603 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
604 local_time)
605
606 fields = [status_code, substr, operation]
607 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
608 fields.append(status)
609
610 if record_prefix is None:
611 record_prefix = self.record_prefix
612
613 msg = '\t'.join(str(x) for x in fields)
614
615 return record_prefix + msg + '\n'
616
617
618 def _record_prerendered(self, msg):
619 """
620 Record a pre-rendered msg into the status logs. The only
621 change this makes to the message is to add on the local
622 indentation. Should not be called outside of server_job.*
623 classes. Unlike _record, this does not write the message
624 to standard output.
625 """
626 lines = []
627 status_file = os.path.join(self.resultdir, 'status.log')
628 status_log = open(status_file, 'a')
629 for line in msg.splitlines():
630 line = self.record_prefix + line + '\n'
631 lines.append(line)
632 status_log.write(line)
633 status_log.close()
634 self.__parse_status(lines)
635
636
mbligh084bc172008-10-18 14:02:45 +0000637 def _fill_server_control_namespace(self, namespace, protect=True):
638 """Prepare a namespace to be used when executing server control files.
639
640 This sets up the control file API by importing modules and making them
641 available under the appropriate names within namespace.
642
643 For use by _execute_code().
644
645 Args:
646 namespace: The namespace dictionary to fill in.
647 protect: Boolean. If True (the default) any operation that would
648 clobber an existing entry in namespace will cause an error.
649 Raises:
650 error.AutoservError: When a name would be clobbered by import.
651 """
652 def _import_names(module_name, names=()):
653 """Import a module and assign named attributes into namespace.
654
655 Args:
656 module_name: The string module name.
657 names: A limiting list of names to import from module_name. If
658 empty (the default), all names are imported from the module
659 similar to a "from foo.bar import *" statement.
660 Raises:
661 error.AutoservError: When a name being imported would clobber
662 a name already in namespace.
663 """
664 module = __import__(module_name, {}, {}, names)
665
666 # No names supplied? Import * from the lowest level module.
667 # (Ugh, why do I have to implement this part myself?)
668 if not names:
669 for submodule_name in module_name.split('.')[1:]:
670 module = getattr(module, submodule_name)
671 if hasattr(module, '__all__'):
672 names = getattr(module, '__all__')
673 else:
674 names = dir(module)
675
676 # Install each name into namespace, checking to make sure it
677 # doesn't override anything that already exists.
678 for name in names:
679 # Check for conflicts to help prevent future problems.
680 if name in namespace and protect:
681 if namespace[name] is not getattr(module, name):
682 raise error.AutoservError('importing name '
683 '%s from %s %r would override %r' %
684 (name, module_name, getattr(module, name),
685 namespace[name]))
686 else:
687 # Encourage cleanliness and the use of __all__ for a
688 # more concrete API with less surprises on '*' imports.
689 warnings.warn('%s (%r) being imported from %s for use '
690 'in server control files is not the '
691 'first occurrance of that import.' %
692 (name, namespace[name], module_name))
693
694 namespace[name] = getattr(module, name)
695
696
697 # This is the equivalent of prepending a bunch of import statements to
698 # the front of the control script.
699 namespace.update(os=os, sys=sys)
700 _import_names('autotest_lib.server',
701 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
702 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
703 _import_names('autotest_lib.server.subcommand',
704 ('parallel', 'parallel_simple', 'subcommand'))
705 _import_names('autotest_lib.server.utils',
706 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
707 _import_names('autotest_lib.client.common_lib.error')
708 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
709
710 # Inject ourself as the job object into other classes within the API.
711 # (Yuck, this injection is a gross thing be part of a public API. -gps)
712 #
713 # XXX Base & SiteAutotest do not appear to use .job. Who does?
714 namespace['autotest'].Autotest.job = self
715 # server.hosts.base_classes.Host uses .job.
716 namespace['hosts'].Host.job = self
717
718
719 def _execute_code(self, code_file, namespace, protect=True):
720 """Execute code using a copy of namespace as a server control script.
721
722 Unless protect_namespace is explicitly set to False, the dict will not
723 be modified.
724
725 Args:
726 code_file: The filename of the control file to execute.
727 namespace: A dict containing names to make available during execution.
728 protect: Boolean. If True (the default) a copy of the namespace dict
729 is used during execution to prevent the code from modifying its
730 contents outside of this function. If False the raw dict is
731 passed in and modifications will be allowed.
732 """
733 if protect:
734 namespace = namespace.copy()
735 self._fill_server_control_namespace(namespace, protect=protect)
736 # TODO: Simplify and get rid of the special cases for only 1 machine.
showard3e66e8c2008-10-27 19:20:51 +0000737 if len(self.machines) > 1:
mbligh084bc172008-10-18 14:02:45 +0000738 machines_text = '\n'.join(self.machines) + '\n'
739 # Only rewrite the file if it does not match our machine list.
740 try:
741 machines_f = open(MACHINES_FILENAME, 'r')
742 existing_machines_text = machines_f.read()
743 machines_f.close()
744 except EnvironmentError:
745 existing_machines_text = None
746 if machines_text != existing_machines_text:
747 utils.open_write_close(MACHINES_FILENAME, machines_text)
748 execfile(code_file, namespace, namespace)
jadmanski10646442008-08-13 14:05:21 +0000749
750
751 def _record(self, status_code, subdir, operation, status='',
752 epoch_time=None, optional_fields=None):
753 """
754 Actual function for recording a single line into the status
755 logs. Should never be called directly, only by job.record as
756 this would bypass the console monitor logging.
757 """
758
759 msg = self._render_record(status_code, subdir, operation,
760 status, epoch_time,
761 optional_fields=optional_fields)
762
763
764 status_file = os.path.join(self.resultdir, 'status.log')
765 sys.stdout.write(msg)
766 open(status_file, "a").write(msg)
767 if subdir:
768 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000769 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000770 open(status_file, "a").write(msg)
771 self.__parse_status(msg.splitlines())
772
773
774 def __parse_status(self, new_lines):
775 if not self.using_parser:
776 return
777 new_tests = self.parser.process_lines(new_lines)
778 for test in new_tests:
779 self.__insert_test(test)
780
781
782 def __insert_test(self, test):
783 """ An internal method to insert a new test result into the
784 database. This method will not raise an exception, even if an
785 error occurs during the insert, to avoid failing a test
786 simply because of unexpected database issues."""
showard21baa452008-10-21 00:08:39 +0000787 self.num_tests_run += 1
788 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
789 self.num_tests_failed += 1
jadmanski10646442008-08-13 14:05:21 +0000790 try:
791 self.results_db.insert_test(self.job_model, test)
792 except Exception:
793 msg = ("WARNING: An unexpected error occured while "
794 "inserting test results into the database. "
795 "Ignoring error.\n" + traceback.format_exc())
796 print >> sys.stderr, msg
797
mblighcaa62c22008-04-07 21:51:17 +0000798
jadmanskib6eb2f12008-09-12 16:39:36 +0000799
jadmanskia1f3c202008-09-15 19:17:16 +0000800class log_collector(object):
801 def __init__(self, host, client_tag, results_dir):
802 self.host = host
803 if not client_tag:
804 client_tag = "default"
805 self.client_results_dir = os.path.join(host.get_autodir(), "results",
806 client_tag)
807 self.server_results_dir = results_dir
808
809
jadmanskib6eb2f12008-09-12 16:39:36 +0000810 def collect_client_job_results(self):
811 """ A method that collects all the current results of a running
812 client job into the results dir. By default does nothing as no
813 client job is running, but when running a client job you can override
814 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000815
816 # make an effort to wait for the machine to come up
817 try:
818 self.host.wait_up(timeout=30)
819 except error.AutoservError:
820 # don't worry about any errors, we'll try and
821 # get the results anyway
822 pass
823
824
825 # Copy all dirs in default to results_dir
jadmanskibfb32f82008-10-03 23:13:36 +0000826 try:
827 keyval_path = self._prepare_for_copying_logs()
828 self.host.get_file(self.client_results_dir + '/',
829 self.server_results_dir)
830 self._process_copied_logs(keyval_path)
831 self._postprocess_copied_logs()
832 except Exception:
833 # well, don't stop running just because we couldn't get logs
834 print "Unexpected error copying test result logs, continuing ..."
835 traceback.print_exc(file=sys.stdout)
jadmanskia1f3c202008-09-15 19:17:16 +0000836
837
838 def _prepare_for_copying_logs(self):
839 server_keyval = os.path.join(self.server_results_dir, 'keyval')
840 if not os.path.exists(server_keyval):
841 # Client-side keyval file can be copied directly
842 return
843
844 # Copy client-side keyval to temporary location
845 suffix = '.keyval_%s' % self.host.hostname
846 fd, keyval_path = tempfile.mkstemp(suffix)
847 os.close(fd)
848 try:
849 client_keyval = os.path.join(self.client_results_dir, 'keyval')
850 try:
851 self.host.get_file(client_keyval, keyval_path)
852 finally:
853 # We will squirrel away the client side keyval
854 # away and move it back when we are done
855 remote_temp_dir = self.host.get_tmp_dir()
856 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
857 self.host.run('mv %s %s' % (client_keyval,
858 self.temp_keyval_path))
859 except (error.AutoservRunError, error.AutoservSSHTimeout):
860 print "Prepare for copying logs failed"
861 return keyval_path
862
863
864 def _process_copied_logs(self, keyval_path):
865 if not keyval_path:
866 # Client-side keyval file was copied directly
867 return
868
869 # Append contents of keyval_<host> file to keyval file
870 try:
871 # Read in new and old keyval files
872 new_keyval = utils.read_keyval(keyval_path)
873 old_keyval = utils.read_keyval(self.server_results_dir)
874 # 'Delete' from new keyval entries that are in both
875 tmp_keyval = {}
876 for key, val in new_keyval.iteritems():
877 if key not in old_keyval:
878 tmp_keyval[key] = val
879 # Append new info to keyval file
880 utils.write_keyval(self.server_results_dir, tmp_keyval)
881 # Delete keyval_<host> file
882 os.remove(keyval_path)
883 except IOError:
884 print "Process copied logs failed"
885
886
887 def _postprocess_copied_logs(self):
888 # we can now put our keyval file back
889 client_keyval = os.path.join(self.client_results_dir, 'keyval')
890 try:
891 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
892 except Exception:
893 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000894
895
mbligh57e78662008-06-17 19:53:49 +0000896# a file-like object for catching stderr from an autotest client and
897# extracting status logs from it
898class client_logger(object):
899 """Partial file object to write to both stdout and
900 the status log file. We only implement those methods
901 utils.run() actually calls.
902 """
jadmanskia1f3c202008-09-15 19:17:16 +0000903 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
904 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000905 extract_indent = re.compile(r"^(\t*).*$")
906
jadmanskia1f3c202008-09-15 19:17:16 +0000907 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000908 self.host = host
909 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000910 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000911 self.leftover = ""
912 self.last_line = ""
913 self.logs = {}
914
915
916 def _process_log_dict(self, log_dict):
917 log_list = log_dict.pop("logs", [])
918 for key in sorted(log_dict.iterkeys()):
919 log_list += self._process_log_dict(log_dict.pop(key))
920 return log_list
921
922
923 def _process_logs(self):
924 """Go through the accumulated logs in self.log and print them
925 out to stdout and the status log. Note that this processes
926 logs in an ordering where:
927
928 1) logs to different tags are never interleaved
929 2) logs to x.y come before logs to x.y.z for all z
930 3) logs to x.y come before x.z whenever y < z
931
932 Note that this will in general not be the same as the
933 chronological ordering of the logs. However, if a chronological
934 ordering is desired that one can be reconstructed from the
935 status log by looking at timestamp lines."""
936 log_list = self._process_log_dict(self.logs)
937 for line in log_list:
938 self.job._record_prerendered(line + '\n')
939 if log_list:
940 self.last_line = log_list[-1]
941
942
943 def _process_quoted_line(self, tag, line):
944 """Process a line quoted with an AUTOTEST_STATUS flag. If the
945 tag is blank then we want to push out all the data we've been
946 building up in self.logs, and then the newest line. If the
947 tag is not blank, then push the line into the logs for handling
948 later."""
949 print line
950 if tag == "":
951 self._process_logs()
952 self.job._record_prerendered(line + '\n')
953 self.last_line = line
954 else:
955 tag_parts = [int(x) for x in tag.split(".")]
956 log_dict = self.logs
957 for part in tag_parts:
958 log_dict = log_dict.setdefault(part, {})
959 log_list = log_dict.setdefault("logs", [])
960 log_list.append(line)
961
962
963 def _process_line(self, line):
964 """Write out a line of data to the appropriate stream. Status
965 lines sent by autotest will be prepended with
966 "AUTOTEST_STATUS", and all other lines are ssh error
967 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000968 status_match = self.status_parser.search(line)
969 test_complete_match = self.test_complete_parser.search(line)
970 if status_match:
971 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000972 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000973 elif test_complete_match:
974 fifo_path, = test_complete_match.groups()
975 self.log_collector.collect_client_job_results()
976 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000977 else:
978 print line
979
jadmanski4aeefe12008-06-20 20:04:25 +0000980
981 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000982 # use the indentation of whatever the last log line was
983 indent = self.extract_indent.match(last_line).group(1)
984 # if the last line starts a new group, add an extra indent
985 if last_line.lstrip('\t').startswith("START\t"):
986 indent += '\t'
987 return [self.job._render_record("WARN", None, None, msg,
988 timestamp, indent).rstrip('\n')
989 for timestamp, msg in warnings]
990
991
992 def _process_warnings(self, last_line, log_dict, warnings):
993 if log_dict.keys() in ([], ["logs"]):
994 # there are no sub-jobs, just append the warnings here
995 warnings = self._format_warnings(last_line, warnings)
996 log_list = log_dict.setdefault("logs", [])
997 log_list += warnings
998 for warning in warnings:
999 sys.stdout.write(warning + '\n')
1000 else:
1001 # there are sub-jobs, so put the warnings in there
1002 log_list = log_dict.get("logs", [])
1003 if log_list:
1004 last_line = log_list[-1]
1005 for key in sorted(log_dict.iterkeys()):
1006 if key != "logs":
1007 self._process_warnings(last_line,
1008 log_dict[key],
1009 warnings)
1010
1011
1012 def write(self, data):
1013 # first check for any new console warnings
1014 warnings = self.job._read_warnings()
1015 self._process_warnings(self.last_line, self.logs, warnings)
1016 # now process the newest data written out
1017 data = self.leftover + data
1018 lines = data.split("\n")
1019 # process every line but the last one
1020 for line in lines[:-1]:
1021 self._process_line(line)
1022 # save the last line for later processing
1023 # since we may not have the whole line yet
1024 self.leftover = lines[-1]
1025
1026
1027 def flush(self):
1028 sys.stdout.flush()
1029
1030
1031 def close(self):
1032 if self.leftover:
1033 self._process_line(self.leftover)
1034 self._process_logs()
1035 self.flush()
1036
1037
mblighcaa62c22008-04-07 21:51:17 +00001038# site_server_job.py may be non-existant or empty, make sure that an
1039# appropriate site_server_job class is created nevertheless
1040try:
jadmanski0afbb632008-06-06 21:10:57 +00001041 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +00001042except ImportError:
jadmanski10646442008-08-13 14:05:21 +00001043 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +00001044 pass
1045
jadmanski10646442008-08-13 14:05:21 +00001046class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +00001047 pass