blob: cf404add9730b970bf0f6085bcd5e50c0d061635 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
jadmanski025099d2008-09-23 14:13:48 +00009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
mbligh084bc172008-10-18 14:02:45 +000010import shutil, warnings
jadmanskic09fc152008-10-15 17:56:59 +000011from autotest_lib.client.bin import fd_stack, sysinfo
mbligh09108442008-10-15 16:27:38 +000012from autotest_lib.client.common_lib import error, log, utils, packages
jadmanski10646442008-08-13 14:05:21 +000013from autotest_lib.server import test, subcommand
14from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000015
16
mbligh084bc172008-10-18 14:02:45 +000017def _control_segment_path(name):
18 """Get the pathname of the named control segment file."""
jadmanski10646442008-08-13 14:05:21 +000019 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh084bc172008-10-18 14:02:45 +000020 return os.path.join(server_dir, "control_segments", name)
jadmanski10646442008-08-13 14:05:21 +000021
22
mbligh084bc172008-10-18 14:02:45 +000023CLIENT_CONTROL_FILENAME = 'control'
24SERVER_CONTROL_FILENAME = 'control.srv'
25MACHINES_FILENAME = '.machines'
jadmanski10646442008-08-13 14:05:21 +000026
mbligh084bc172008-10-18 14:02:45 +000027CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
28CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
29CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
mbligh084bc172008-10-18 14:02:45 +000030INSTALL_CONTROL_FILE = _control_segment_path('install')
showard45ae8192008-11-05 19:32:53 +000031CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
jadmanski10646442008-08-13 14:05:21 +000032
mbligh084bc172008-10-18 14:02:45 +000033VERIFY_CONTROL_FILE = _control_segment_path('verify')
mbligh084bc172008-10-18 14:02:45 +000034REPAIR_CONTROL_FILE = _control_segment_path('repair')
jadmanski10646442008-08-13 14:05:21 +000035
36
37# load up site-specific code for generating site-specific job data
38try:
39 import site_job
40 get_site_job_data = site_job.get_site_job_data
41 del site_job
42except ImportError:
43 # by default provide a stub that generates no site data
44 def get_site_job_data(job):
45 return {}
46
47
48class base_server_job(object):
49 """The actual job against which we do everything.
50
51 Properties:
52 autodir
53 The top level autotest directory (/usr/local/autotest).
54 serverdir
55 <autodir>/server/
56 clientdir
57 <autodir>/client/
58 conmuxdir
59 <autodir>/conmux/
60 testdir
61 <autodir>/server/tests/
62 site_testdir
63 <autodir>/server/site_tests/
64 control
65 the control file for this job
66 """
67
68 STATUS_VERSION = 1
69
70
71 def __init__(self, control, args, resultdir, label, user, machines,
72 client=False, parse_job='',
73 ssh_user='root', ssh_port=22, ssh_pass=''):
74 """
75 control
76 The control file (pathname of)
77 args
78 args to pass to the control file
79 resultdir
80 where to throw the results
81 label
82 label for the job
83 user
84 Username for the job (email address)
85 client
86 True if a client-side control file
87 """
88 path = os.path.dirname(__file__)
89 self.autodir = os.path.abspath(os.path.join(path, '..'))
90 self.serverdir = os.path.join(self.autodir, 'server')
91 self.testdir = os.path.join(self.serverdir, 'tests')
92 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
93 self.tmpdir = os.path.join(self.serverdir, 'tmp')
94 self.conmuxdir = os.path.join(self.autodir, 'conmux')
95 self.clientdir = os.path.join(self.autodir, 'client')
96 self.toolsdir = os.path.join(self.autodir, 'client/tools')
97 if control:
98 self.control = open(control, 'r').read()
99 self.control = re.sub('\r', '', self.control)
100 else:
showard45ae8192008-11-05 19:32:53 +0000101 self.control = ''
jadmanski10646442008-08-13 14:05:21 +0000102 self.resultdir = resultdir
mbligh80e1eba2008-11-19 00:26:18 +0000103 if resultdir:
104 if not os.path.exists(resultdir):
105 os.mkdir(resultdir)
106 self.debugdir = os.path.join(resultdir, 'debug')
107 if not os.path.exists(self.debugdir):
108 os.mkdir(self.debugdir)
109 self.status = os.path.join(resultdir, 'status')
110 else:
111 self.status = None
jadmanski10646442008-08-13 14:05:21 +0000112 self.label = label
113 self.user = user
114 self.args = args
115 self.machines = machines
116 self.client = client
117 self.record_prefix = ''
118 self.warning_loggers = set()
119 self.ssh_user = ssh_user
120 self.ssh_port = ssh_port
121 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000122 self.run_test_cleanup = True
mbligh09108442008-10-15 16:27:38 +0000123 self.last_boot_tag = None
jadmanski53aaf382008-11-17 16:22:31 +0000124 self.hosts = set()
jadmanski10646442008-08-13 14:05:21 +0000125
126 self.stdout = fd_stack.fd_stack(1, sys.stdout)
127 self.stderr = fd_stack.fd_stack(2, sys.stderr)
128
mbligh80e1eba2008-11-19 00:26:18 +0000129 if resultdir:
130 self.sysinfo = sysinfo.sysinfo(self.resultdir)
jadmanskic09fc152008-10-15 17:56:59 +0000131
jadmanski025099d2008-09-23 14:13:48 +0000132 if not os.access(self.tmpdir, os.W_OK):
133 try:
134 os.makedirs(self.tmpdir, 0700)
135 except os.error, e:
136 # Thrown if the directory already exists, which it may.
137 pass
138
139 if (not os.access(self.tmpdir, os.W_OK) or
140 not os.path.isdir(self.tmpdir)):
141 self.tmpdir = os.path.join(tempfile.gettempdir(),
142 'autotest-' + getpass.getuser())
143 try:
144 os.makedirs(self.tmpdir, 0700)
145 except os.error, e:
146 # Thrown if the directory already exists, which it may.
147 # If the problem was something other than the
148 # directory already existing, this chmod should throw as well
149 # exception.
150 os.chmod(self.tmpdir, stat.S_IRWXU)
151
mbligh80e1eba2008-11-19 00:26:18 +0000152 if self.status and os.path.exists(self.status):
jadmanski10646442008-08-13 14:05:21 +0000153 os.unlink(self.status)
154 job_data = {'label' : label, 'user' : user,
155 'hostname' : ','.join(machines),
156 'status_version' : str(self.STATUS_VERSION)}
mbligh80e1eba2008-11-19 00:26:18 +0000157 if self.resultdir:
158 job_data.update(get_site_job_data(self))
159 utils.write_keyval(self.resultdir, job_data)
jadmanski10646442008-08-13 14:05:21 +0000160
161 self.parse_job = parse_job
162 if self.parse_job and len(machines) == 1:
163 self.using_parser = True
164 self.init_parser(resultdir)
165 else:
166 self.using_parser = False
167 self.pkgmgr = packages.PackageManager(
168 self.autodir, run_function_dargs={'timeout':600})
169 self.pkgdir = os.path.join(self.autodir, 'packages')
170
showard21baa452008-10-21 00:08:39 +0000171 self.num_tests_run = 0
172 self.num_tests_failed = 0
173
jadmanski10646442008-08-13 14:05:21 +0000174
175 def init_parser(self, resultdir):
176 """Start the continuous parsing of resultdir. This sets up
177 the database connection and inserts the basic job object into
178 the database if necessary."""
179 # redirect parser debugging to .parse.log
180 parse_log = os.path.join(resultdir, '.parse.log')
181 parse_log = open(parse_log, 'w', 0)
182 tko_utils.redirect_parser_debugging(parse_log)
183 # create a job model object and set up the db
184 self.results_db = tko_db.db(autocommit=True)
185 self.parser = status_lib.parser(self.STATUS_VERSION)
186 self.job_model = self.parser.make_job(resultdir)
187 self.parser.start(self.job_model)
188 # check if a job already exists in the db and insert it if
189 # it does not
190 job_idx = self.results_db.find_job(self.parse_job)
191 if job_idx is None:
192 self.results_db.insert_job(self.parse_job,
193 self.job_model)
194 else:
195 machine_idx = self.results_db.lookup_machine(
196 self.job_model.machine)
197 self.job_model.index = job_idx
198 self.job_model.machine_idx = machine_idx
199
200
201 def cleanup_parser(self):
202 """This should be called after the server job is finished
203 to carry out any remaining cleanup (e.g. flushing any
204 remaining test results to the results db)"""
205 if not self.using_parser:
206 return
207 final_tests = self.parser.end()
208 for test in final_tests:
209 self.__insert_test(test)
210 self.using_parser = False
211
212
213 def verify(self):
214 if not self.machines:
mbligh084bc172008-10-18 14:02:45 +0000215 raise error.AutoservError('No machines specified to verify')
jadmanski10646442008-08-13 14:05:21 +0000216 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000217 namespace = {'machines' : self.machines, 'job' : self,
218 'ssh_user' : self.ssh_user,
219 'ssh_port' : self.ssh_port,
220 'ssh_pass' : self.ssh_pass}
mbligh084bc172008-10-18 14:02:45 +0000221 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000222 except Exception, e:
223 msg = ('Verify failed\n' + str(e) + '\n'
224 + traceback.format_exc())
225 self.record('ABORT', None, None, msg)
226 raise
227
228
229 def repair(self, host_protection):
230 if not self.machines:
231 raise error.AutoservError('No machines specified to repair')
232 namespace = {'machines': self.machines, 'job': self,
233 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
234 'ssh_pass': self.ssh_pass,
235 'protection_level': host_protection}
236 # no matter what happens during repair, go on to try to reverify
237 try:
mbligh084bc172008-10-18 14:02:45 +0000238 self._execute_code(REPAIR_CONTROL_FILE, namespace,
239 protect=False)
jadmanski10646442008-08-13 14:05:21 +0000240 except Exception, exc:
241 print 'Exception occured during repair'
242 traceback.print_exc()
243 self.verify()
244
245
246 def precheck(self):
247 """
248 perform any additional checks in derived classes.
249 """
250 pass
251
252
253 def enable_external_logging(self):
254 """Start or restart external logging mechanism.
255 """
256 pass
257
258
259 def disable_external_logging(self):
260 """ Pause or stop external logging mechanism.
261 """
262 pass
263
264
jadmanski23afbec2008-09-17 18:12:07 +0000265 def enable_test_cleanup(self):
266 """ By default tests run test.cleanup """
267 self.run_test_cleanup = True
268
269
270 def disable_test_cleanup(self):
271 """ By default tests do not run test.cleanup """
272 self.run_test_cleanup = False
273
274
jadmanski10646442008-08-13 14:05:21 +0000275 def use_external_logging(self):
276 """Return True if external logging should be used.
277 """
278 return False
279
280
281 def parallel_simple(self, function, machines, log=True, timeout=None):
282 """Run 'function' using parallel_simple, with an extra
283 wrapper to handle the necessary setup for continuous parsing,
284 if possible. If continuous parsing is already properly
285 initialized then this should just work."""
286 is_forking = not (len(machines) == 1 and
287 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000288 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000289 def wrapper(machine):
290 self.parse_job += "/" + machine
291 self.using_parser = True
292 self.machines = [machine]
293 self.resultdir = os.path.join(self.resultdir,
294 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000295 os.chdir(self.resultdir)
showard2bab8f42008-11-12 18:15:22 +0000296 utils.write_keyval(self.resultdir, {"hostname": machine})
jadmanski10646442008-08-13 14:05:21 +0000297 self.init_parser(self.resultdir)
298 result = function(machine)
299 self.cleanup_parser()
300 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000301 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000302 def wrapper(machine):
303 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000304 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000305 result = function(machine)
306 return result
307 else:
308 wrapper = function
309 subcommand.parallel_simple(wrapper, machines, log, timeout)
310
311
showard45ae8192008-11-05 19:32:53 +0000312 def run(self, cleanup = False, install_before = False,
jadmanski10646442008-08-13 14:05:21 +0000313 install_after = False, collect_crashdumps = True,
314 namespace = {}):
315 # use a copy so changes don't affect the original dictionary
316 namespace = namespace.copy()
317 machines = self.machines
318
319 self.aborted = False
320 namespace['machines'] = machines
321 namespace['args'] = self.args
322 namespace['job'] = self
323 namespace['ssh_user'] = self.ssh_user
324 namespace['ssh_port'] = self.ssh_port
325 namespace['ssh_pass'] = self.ssh_pass
326 test_start_time = int(time.time())
327
mbligh80e1eba2008-11-19 00:26:18 +0000328 if self.resultdir:
329 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000330
mbligh80e1eba2008-11-19 00:26:18 +0000331 self.enable_external_logging()
332 status_log = os.path.join(self.resultdir, 'status.log')
jadmanskicdd0c402008-09-19 21:21:31 +0000333 collect_crashinfo = True
jadmanski10646442008-08-13 14:05:21 +0000334 try:
335 if install_before and machines:
mbligh084bc172008-10-18 14:02:45 +0000336 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000337 if self.client:
338 namespace['control'] = self.control
mbligh084bc172008-10-18 14:02:45 +0000339 utils.open_write_close(CLIENT_CONTROL_FILENAME, self.control)
340 shutil.copy(CLIENT_WRAPPER_CONTROL_FILE,
341 SERVER_CONTROL_FILENAME)
jadmanski10646442008-08-13 14:05:21 +0000342 else:
mbligh084bc172008-10-18 14:02:45 +0000343 utils.open_write_close(SERVER_CONTROL_FILENAME, self.control)
344 self._execute_code(SERVER_CONTROL_FILENAME, namespace)
jadmanski10646442008-08-13 14:05:21 +0000345
jadmanskicdd0c402008-09-19 21:21:31 +0000346 # disable crashinfo collection if we get this far without error
347 collect_crashinfo = False
jadmanski10646442008-08-13 14:05:21 +0000348 finally:
jadmanskicdd0c402008-09-19 21:21:31 +0000349 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000350 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000351 if collect_crashinfo:
mbligh084bc172008-10-18 14:02:45 +0000352 # includes crashdumps
353 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
jadmanskicdd0c402008-09-19 21:21:31 +0000354 else:
mbligh084bc172008-10-18 14:02:45 +0000355 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000356 self.disable_external_logging()
showard45ae8192008-11-05 19:32:53 +0000357 if cleanup and machines:
358 self._execute_code(CLEANUP_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000359 if install_after and machines:
mbligh084bc172008-10-18 14:02:45 +0000360 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000361
362
363 def run_test(self, url, *args, **dargs):
364 """Summon a test object and run it.
365
366 tag
367 tag to add to testname
368 url
369 url of the test to run
370 """
371
372 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000373
374 tag = dargs.pop('tag', None)
375 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000376 testname += '.' + tag
377 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000378
379 outputdir = os.path.join(self.resultdir, subdir)
380 if os.path.exists(outputdir):
381 msg = ("%s already exists, test <%s> may have"
382 " already run with tag <%s>"
383 % (outputdir, testname, tag) )
384 raise error.TestError(msg)
385 os.mkdir(outputdir)
386
387 def group_func():
388 try:
389 test.runtest(self, url, tag, args, dargs)
390 except error.TestBaseException, e:
391 self.record(e.exit_status, subdir, testname, str(e))
392 raise
393 except Exception, e:
394 info = str(e) + "\n" + traceback.format_exc()
395 self.record('FAIL', subdir, testname, info)
396 raise
397 else:
398 self.record('GOOD', subdir, testname,
399 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000400
401 result, exc_info = self._run_group(testname, subdir, group_func)
402 if exc_info and isinstance(exc_info[1], error.TestBaseException):
403 return False
404 elif exc_info:
405 raise exc_info[0], exc_info[1], exc_info[2]
406 else:
407 return True
jadmanski10646442008-08-13 14:05:21 +0000408
409
410 def _run_group(self, name, subdir, function, *args, **dargs):
411 """\
412 Underlying method for running something inside of a group.
413 """
jadmanskide292df2008-08-26 20:51:14 +0000414 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000415 old_record_prefix = self.record_prefix
416 try:
417 self.record('START', subdir, name)
418 self.record_prefix += '\t'
419 try:
420 result = function(*args, **dargs)
421 finally:
422 self.record_prefix = old_record_prefix
423 except error.TestBaseException, e:
424 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000425 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000426 except Exception, e:
427 err_msg = str(e) + '\n'
428 err_msg += traceback.format_exc()
429 self.record('END ABORT', subdir, name, err_msg)
430 raise error.JobError(name + ' failed\n' + traceback.format_exc())
431 else:
432 self.record('END GOOD', subdir, name)
433
jadmanskide292df2008-08-26 20:51:14 +0000434 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000435
436
437 def run_group(self, function, *args, **dargs):
438 """\
439 function:
440 subroutine to run
441 *args:
442 arguments for the function
443 """
444
445 name = function.__name__
446
447 # Allow the tag for the group to be specified.
448 tag = dargs.pop('tag', None)
449 if tag:
450 name = tag
451
jadmanskide292df2008-08-26 20:51:14 +0000452 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000453
454
455 def run_reboot(self, reboot_func, get_kernel_func):
456 """\
457 A specialization of run_group meant specifically for handling
458 a reboot. Includes support for capturing the kernel version
459 after the reboot.
460
461 reboot_func: a function that carries out the reboot
462
463 get_kernel_func: a function that returns a string
464 representing the kernel version.
465 """
466
467 old_record_prefix = self.record_prefix
468 try:
469 self.record('START', None, 'reboot')
470 self.record_prefix += '\t'
471 reboot_func()
472 except Exception, e:
473 self.record_prefix = old_record_prefix
474 err_msg = str(e) + '\n' + traceback.format_exc()
475 self.record('END FAIL', None, 'reboot', err_msg)
476 else:
477 kernel = get_kernel_func()
478 self.record_prefix = old_record_prefix
479 self.record('END GOOD', None, 'reboot',
480 optional_fields={"kernel": kernel})
481
482
jadmanskic09fc152008-10-15 17:56:59 +0000483 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
484 self._add_sysinfo_loggable(sysinfo.command(command, logfile),
485 on_every_test)
486
487
488 def add_sysinfo_logfile(self, file, on_every_test=False):
489 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
490
491
492 def _add_sysinfo_loggable(self, loggable, on_every_test):
493 if on_every_test:
494 self.sysinfo.test_loggables.add(loggable)
495 else:
496 self.sysinfo.boot_loggables.add(loggable)
497
498
jadmanski10646442008-08-13 14:05:21 +0000499 def record(self, status_code, subdir, operation, status='',
500 optional_fields=None):
501 """
502 Record job-level status
503
504 The intent is to make this file both machine parseable and
505 human readable. That involves a little more complexity, but
506 really isn't all that bad ;-)
507
508 Format is <status code>\t<subdir>\t<operation>\t<status>
509
mbligh1b3b3762008-09-25 02:46:34 +0000510 status code: see common_lib.log.is_valid_status()
jadmanski10646442008-08-13 14:05:21 +0000511 for valid status definition
512
513 subdir: MUST be a relevant subdirectory in the results,
514 or None, which will be represented as '----'
515
516 operation: description of what you ran (e.g. "dbench", or
517 "mkfs -t foobar /dev/sda9")
518
519 status: error message or "completed sucessfully"
520
521 ------------------------------------------------------------
522
523 Initial tabs indicate indent levels for grouping, and is
524 governed by self.record_prefix
525
526 multiline messages have secondary lines prefaced by a double
527 space (' ')
528
529 Executing this method will trigger the logging of all new
530 warnings to date from the various console loggers.
531 """
532 # poll all our warning loggers for new warnings
533 warnings = self._read_warnings()
534 for timestamp, msg in warnings:
535 self._record("WARN", None, None, msg, timestamp)
536
537 # write out the actual status log line
538 self._record(status_code, subdir, operation, status,
539 optional_fields=optional_fields)
540
541
542 def _read_warnings(self):
543 warnings = []
544 while True:
545 # pull in a line of output from every logger that has
546 # output ready to be read
547 loggers, _, _ = select.select(self.warning_loggers,
548 [], [], 0)
549 closed_loggers = set()
550 for logger in loggers:
551 line = logger.readline()
552 # record any broken pipes (aka line == empty)
553 if len(line) == 0:
554 closed_loggers.add(logger)
555 continue
556 timestamp, msg = line.split('\t', 1)
557 warnings.append((int(timestamp), msg.strip()))
558
559 # stop listening to loggers that are closed
560 self.warning_loggers -= closed_loggers
561
562 # stop if none of the loggers have any output left
563 if not loggers:
564 break
565
566 # sort into timestamp order
567 warnings.sort()
568 return warnings
569
570
571 def _render_record(self, status_code, subdir, operation, status='',
572 epoch_time=None, record_prefix=None,
573 optional_fields=None):
574 """
575 Internal Function to generate a record to be written into a
576 status log. For use by server_job.* classes only.
577 """
578 if subdir:
579 if re.match(r'[\n\t]', subdir):
580 raise ValueError(
581 'Invalid character in subdir string')
582 substr = subdir
583 else:
584 substr = '----'
585
mbligh1b3b3762008-09-25 02:46:34 +0000586 if not log.is_valid_status(status_code):
jadmanski10646442008-08-13 14:05:21 +0000587 raise ValueError('Invalid status code supplied: %s' %
588 status_code)
589 if not operation:
590 operation = '----'
591 if re.match(r'[\n\t]', operation):
592 raise ValueError(
593 'Invalid character in operation string')
594 operation = operation.rstrip()
595 status = status.rstrip()
596 status = re.sub(r"\t", " ", status)
597 # Ensure any continuation lines are marked so we can
598 # detect them in the status file to ensure it is parsable.
599 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
600
601 if not optional_fields:
602 optional_fields = {}
603
604 # Generate timestamps for inclusion in the logs
605 if epoch_time is None:
606 epoch_time = int(time.time())
607 local_time = time.localtime(epoch_time)
608 optional_fields["timestamp"] = str(epoch_time)
609 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
610 local_time)
611
612 fields = [status_code, substr, operation]
613 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
614 fields.append(status)
615
616 if record_prefix is None:
617 record_prefix = self.record_prefix
618
619 msg = '\t'.join(str(x) for x in fields)
620
621 return record_prefix + msg + '\n'
622
623
624 def _record_prerendered(self, msg):
625 """
626 Record a pre-rendered msg into the status logs. The only
627 change this makes to the message is to add on the local
628 indentation. Should not be called outside of server_job.*
629 classes. Unlike _record, this does not write the message
630 to standard output.
631 """
632 lines = []
633 status_file = os.path.join(self.resultdir, 'status.log')
634 status_log = open(status_file, 'a')
635 for line in msg.splitlines():
636 line = self.record_prefix + line + '\n'
637 lines.append(line)
638 status_log.write(line)
639 status_log.close()
640 self.__parse_status(lines)
641
642
mbligh084bc172008-10-18 14:02:45 +0000643 def _fill_server_control_namespace(self, namespace, protect=True):
644 """Prepare a namespace to be used when executing server control files.
645
646 This sets up the control file API by importing modules and making them
647 available under the appropriate names within namespace.
648
649 For use by _execute_code().
650
651 Args:
652 namespace: The namespace dictionary to fill in.
653 protect: Boolean. If True (the default) any operation that would
654 clobber an existing entry in namespace will cause an error.
655 Raises:
656 error.AutoservError: When a name would be clobbered by import.
657 """
658 def _import_names(module_name, names=()):
659 """Import a module and assign named attributes into namespace.
660
661 Args:
662 module_name: The string module name.
663 names: A limiting list of names to import from module_name. If
664 empty (the default), all names are imported from the module
665 similar to a "from foo.bar import *" statement.
666 Raises:
667 error.AutoservError: When a name being imported would clobber
668 a name already in namespace.
669 """
670 module = __import__(module_name, {}, {}, names)
671
672 # No names supplied? Import * from the lowest level module.
673 # (Ugh, why do I have to implement this part myself?)
674 if not names:
675 for submodule_name in module_name.split('.')[1:]:
676 module = getattr(module, submodule_name)
677 if hasattr(module, '__all__'):
678 names = getattr(module, '__all__')
679 else:
680 names = dir(module)
681
682 # Install each name into namespace, checking to make sure it
683 # doesn't override anything that already exists.
684 for name in names:
685 # Check for conflicts to help prevent future problems.
686 if name in namespace and protect:
687 if namespace[name] is not getattr(module, name):
688 raise error.AutoservError('importing name '
689 '%s from %s %r would override %r' %
690 (name, module_name, getattr(module, name),
691 namespace[name]))
692 else:
693 # Encourage cleanliness and the use of __all__ for a
694 # more concrete API with less surprises on '*' imports.
695 warnings.warn('%s (%r) being imported from %s for use '
696 'in server control files is not the '
697 'first occurrance of that import.' %
698 (name, namespace[name], module_name))
699
700 namespace[name] = getattr(module, name)
701
702
703 # This is the equivalent of prepending a bunch of import statements to
704 # the front of the control script.
705 namespace.update(os=os, sys=sys)
706 _import_names('autotest_lib.server',
707 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
708 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
709 _import_names('autotest_lib.server.subcommand',
710 ('parallel', 'parallel_simple', 'subcommand'))
711 _import_names('autotest_lib.server.utils',
712 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
713 _import_names('autotest_lib.client.common_lib.error')
714 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
715
716 # Inject ourself as the job object into other classes within the API.
717 # (Yuck, this injection is a gross thing be part of a public API. -gps)
718 #
719 # XXX Base & SiteAutotest do not appear to use .job. Who does?
720 namespace['autotest'].Autotest.job = self
721 # server.hosts.base_classes.Host uses .job.
722 namespace['hosts'].Host.job = self
723
724
725 def _execute_code(self, code_file, namespace, protect=True):
726 """Execute code using a copy of namespace as a server control script.
727
728 Unless protect_namespace is explicitly set to False, the dict will not
729 be modified.
730
731 Args:
732 code_file: The filename of the control file to execute.
733 namespace: A dict containing names to make available during execution.
734 protect: Boolean. If True (the default) a copy of the namespace dict
735 is used during execution to prevent the code from modifying its
736 contents outside of this function. If False the raw dict is
737 passed in and modifications will be allowed.
738 """
739 if protect:
740 namespace = namespace.copy()
741 self._fill_server_control_namespace(namespace, protect=protect)
742 # TODO: Simplify and get rid of the special cases for only 1 machine.
showard3e66e8c2008-10-27 19:20:51 +0000743 if len(self.machines) > 1:
mbligh084bc172008-10-18 14:02:45 +0000744 machines_text = '\n'.join(self.machines) + '\n'
745 # Only rewrite the file if it does not match our machine list.
746 try:
747 machines_f = open(MACHINES_FILENAME, 'r')
748 existing_machines_text = machines_f.read()
749 machines_f.close()
750 except EnvironmentError:
751 existing_machines_text = None
752 if machines_text != existing_machines_text:
753 utils.open_write_close(MACHINES_FILENAME, machines_text)
754 execfile(code_file, namespace, namespace)
jadmanski10646442008-08-13 14:05:21 +0000755
756
757 def _record(self, status_code, subdir, operation, status='',
758 epoch_time=None, optional_fields=None):
759 """
760 Actual function for recording a single line into the status
761 logs. Should never be called directly, only by job.record as
762 this would bypass the console monitor logging.
763 """
764
765 msg = self._render_record(status_code, subdir, operation,
766 status, epoch_time,
767 optional_fields=optional_fields)
768
769
770 status_file = os.path.join(self.resultdir, 'status.log')
771 sys.stdout.write(msg)
772 open(status_file, "a").write(msg)
773 if subdir:
774 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000775 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000776 open(status_file, "a").write(msg)
777 self.__parse_status(msg.splitlines())
778
779
780 def __parse_status(self, new_lines):
781 if not self.using_parser:
782 return
783 new_tests = self.parser.process_lines(new_lines)
784 for test in new_tests:
785 self.__insert_test(test)
786
787
788 def __insert_test(self, test):
789 """ An internal method to insert a new test result into the
790 database. This method will not raise an exception, even if an
791 error occurs during the insert, to avoid failing a test
792 simply because of unexpected database issues."""
showard21baa452008-10-21 00:08:39 +0000793 self.num_tests_run += 1
794 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
795 self.num_tests_failed += 1
jadmanski10646442008-08-13 14:05:21 +0000796 try:
797 self.results_db.insert_test(self.job_model, test)
798 except Exception:
799 msg = ("WARNING: An unexpected error occured while "
800 "inserting test results into the database. "
801 "Ignoring error.\n" + traceback.format_exc())
802 print >> sys.stderr, msg
803
mblighcaa62c22008-04-07 21:51:17 +0000804
jadmanskib6eb2f12008-09-12 16:39:36 +0000805
jadmanskia1f3c202008-09-15 19:17:16 +0000806class log_collector(object):
807 def __init__(self, host, client_tag, results_dir):
808 self.host = host
809 if not client_tag:
810 client_tag = "default"
811 self.client_results_dir = os.path.join(host.get_autodir(), "results",
812 client_tag)
813 self.server_results_dir = results_dir
814
815
jadmanskib6eb2f12008-09-12 16:39:36 +0000816 def collect_client_job_results(self):
817 """ A method that collects all the current results of a running
818 client job into the results dir. By default does nothing as no
819 client job is running, but when running a client job you can override
820 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000821
822 # make an effort to wait for the machine to come up
823 try:
824 self.host.wait_up(timeout=30)
825 except error.AutoservError:
826 # don't worry about any errors, we'll try and
827 # get the results anyway
828 pass
829
830
831 # Copy all dirs in default to results_dir
jadmanskibfb32f82008-10-03 23:13:36 +0000832 try:
833 keyval_path = self._prepare_for_copying_logs()
834 self.host.get_file(self.client_results_dir + '/',
835 self.server_results_dir)
836 self._process_copied_logs(keyval_path)
837 self._postprocess_copied_logs()
838 except Exception:
839 # well, don't stop running just because we couldn't get logs
840 print "Unexpected error copying test result logs, continuing ..."
841 traceback.print_exc(file=sys.stdout)
jadmanskia1f3c202008-09-15 19:17:16 +0000842
843
844 def _prepare_for_copying_logs(self):
845 server_keyval = os.path.join(self.server_results_dir, 'keyval')
846 if not os.path.exists(server_keyval):
847 # Client-side keyval file can be copied directly
848 return
849
850 # Copy client-side keyval to temporary location
851 suffix = '.keyval_%s' % self.host.hostname
852 fd, keyval_path = tempfile.mkstemp(suffix)
853 os.close(fd)
854 try:
855 client_keyval = os.path.join(self.client_results_dir, 'keyval')
856 try:
857 self.host.get_file(client_keyval, keyval_path)
858 finally:
859 # We will squirrel away the client side keyval
860 # away and move it back when we are done
861 remote_temp_dir = self.host.get_tmp_dir()
862 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
863 self.host.run('mv %s %s' % (client_keyval,
864 self.temp_keyval_path))
865 except (error.AutoservRunError, error.AutoservSSHTimeout):
866 print "Prepare for copying logs failed"
867 return keyval_path
868
869
870 def _process_copied_logs(self, keyval_path):
871 if not keyval_path:
872 # Client-side keyval file was copied directly
873 return
874
875 # Append contents of keyval_<host> file to keyval file
876 try:
877 # Read in new and old keyval files
878 new_keyval = utils.read_keyval(keyval_path)
879 old_keyval = utils.read_keyval(self.server_results_dir)
880 # 'Delete' from new keyval entries that are in both
881 tmp_keyval = {}
882 for key, val in new_keyval.iteritems():
883 if key not in old_keyval:
884 tmp_keyval[key] = val
885 # Append new info to keyval file
886 utils.write_keyval(self.server_results_dir, tmp_keyval)
887 # Delete keyval_<host> file
888 os.remove(keyval_path)
889 except IOError:
890 print "Process copied logs failed"
891
892
893 def _postprocess_copied_logs(self):
894 # we can now put our keyval file back
895 client_keyval = os.path.join(self.client_results_dir, 'keyval')
896 try:
897 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
898 except Exception:
899 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000900
901
mbligh57e78662008-06-17 19:53:49 +0000902# a file-like object for catching stderr from an autotest client and
903# extracting status logs from it
904class client_logger(object):
905 """Partial file object to write to both stdout and
906 the status log file. We only implement those methods
907 utils.run() actually calls.
908 """
jadmanskia1f3c202008-09-15 19:17:16 +0000909 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
910 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000911 extract_indent = re.compile(r"^(\t*).*$")
912
jadmanskia1f3c202008-09-15 19:17:16 +0000913 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000914 self.host = host
915 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000916 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000917 self.leftover = ""
918 self.last_line = ""
919 self.logs = {}
920
921
922 def _process_log_dict(self, log_dict):
923 log_list = log_dict.pop("logs", [])
924 for key in sorted(log_dict.iterkeys()):
925 log_list += self._process_log_dict(log_dict.pop(key))
926 return log_list
927
928
929 def _process_logs(self):
930 """Go through the accumulated logs in self.log and print them
931 out to stdout and the status log. Note that this processes
932 logs in an ordering where:
933
934 1) logs to different tags are never interleaved
935 2) logs to x.y come before logs to x.y.z for all z
936 3) logs to x.y come before x.z whenever y < z
937
938 Note that this will in general not be the same as the
939 chronological ordering of the logs. However, if a chronological
940 ordering is desired that one can be reconstructed from the
941 status log by looking at timestamp lines."""
942 log_list = self._process_log_dict(self.logs)
943 for line in log_list:
944 self.job._record_prerendered(line + '\n')
945 if log_list:
946 self.last_line = log_list[-1]
947
948
949 def _process_quoted_line(self, tag, line):
950 """Process a line quoted with an AUTOTEST_STATUS flag. If the
951 tag is blank then we want to push out all the data we've been
952 building up in self.logs, and then the newest line. If the
953 tag is not blank, then push the line into the logs for handling
954 later."""
955 print line
956 if tag == "":
957 self._process_logs()
958 self.job._record_prerendered(line + '\n')
959 self.last_line = line
960 else:
961 tag_parts = [int(x) for x in tag.split(".")]
962 log_dict = self.logs
963 for part in tag_parts:
964 log_dict = log_dict.setdefault(part, {})
965 log_list = log_dict.setdefault("logs", [])
966 log_list.append(line)
967
968
969 def _process_line(self, line):
970 """Write out a line of data to the appropriate stream. Status
971 lines sent by autotest will be prepended with
972 "AUTOTEST_STATUS", and all other lines are ssh error
973 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000974 status_match = self.status_parser.search(line)
975 test_complete_match = self.test_complete_parser.search(line)
976 if status_match:
977 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000978 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000979 elif test_complete_match:
980 fifo_path, = test_complete_match.groups()
981 self.log_collector.collect_client_job_results()
982 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000983 else:
984 print line
985
jadmanski4aeefe12008-06-20 20:04:25 +0000986
987 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000988 # use the indentation of whatever the last log line was
989 indent = self.extract_indent.match(last_line).group(1)
990 # if the last line starts a new group, add an extra indent
991 if last_line.lstrip('\t').startswith("START\t"):
992 indent += '\t'
993 return [self.job._render_record("WARN", None, None, msg,
994 timestamp, indent).rstrip('\n')
995 for timestamp, msg in warnings]
996
997
998 def _process_warnings(self, last_line, log_dict, warnings):
999 if log_dict.keys() in ([], ["logs"]):
1000 # there are no sub-jobs, just append the warnings here
1001 warnings = self._format_warnings(last_line, warnings)
1002 log_list = log_dict.setdefault("logs", [])
1003 log_list += warnings
1004 for warning in warnings:
1005 sys.stdout.write(warning + '\n')
1006 else:
1007 # there are sub-jobs, so put the warnings in there
1008 log_list = log_dict.get("logs", [])
1009 if log_list:
1010 last_line = log_list[-1]
1011 for key in sorted(log_dict.iterkeys()):
1012 if key != "logs":
1013 self._process_warnings(last_line,
1014 log_dict[key],
1015 warnings)
1016
1017
1018 def write(self, data):
1019 # first check for any new console warnings
1020 warnings = self.job._read_warnings()
1021 self._process_warnings(self.last_line, self.logs, warnings)
1022 # now process the newest data written out
1023 data = self.leftover + data
1024 lines = data.split("\n")
1025 # process every line but the last one
1026 for line in lines[:-1]:
1027 self._process_line(line)
1028 # save the last line for later processing
1029 # since we may not have the whole line yet
1030 self.leftover = lines[-1]
1031
1032
1033 def flush(self):
1034 sys.stdout.flush()
1035
1036
1037 def close(self):
1038 if self.leftover:
1039 self._process_line(self.leftover)
1040 self._process_logs()
1041 self.flush()
1042
1043
mblighcaa62c22008-04-07 21:51:17 +00001044# site_server_job.py may be non-existant or empty, make sure that an
1045# appropriate site_server_job class is created nevertheless
1046try:
jadmanski0afbb632008-06-06 21:10:57 +00001047 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +00001048except ImportError:
jadmanski10646442008-08-13 14:05:21 +00001049 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +00001050 pass
1051
jadmanski10646442008-08-13 14:05:21 +00001052class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +00001053 pass