blob: fa1eb9bd15519aac9dd59dac0359468b201b5bf7 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
jadmanski025099d2008-09-23 14:13:48 +00009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
mbligh084bc172008-10-18 14:02:45 +000010import shutil, warnings
jadmanskic09fc152008-10-15 17:56:59 +000011from autotest_lib.client.bin import fd_stack, sysinfo
mbligh09108442008-10-15 16:27:38 +000012from autotest_lib.client.common_lib import error, log, utils, packages
jadmanski10646442008-08-13 14:05:21 +000013from autotest_lib.server import test, subcommand
14from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000015
16
mbligh084bc172008-10-18 14:02:45 +000017def _control_segment_path(name):
18 """Get the pathname of the named control segment file."""
jadmanski10646442008-08-13 14:05:21 +000019 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh084bc172008-10-18 14:02:45 +000020 return os.path.join(server_dir, "control_segments", name)
jadmanski10646442008-08-13 14:05:21 +000021
22
mbligh084bc172008-10-18 14:02:45 +000023CLIENT_CONTROL_FILENAME = 'control'
24SERVER_CONTROL_FILENAME = 'control.srv'
25MACHINES_FILENAME = '.machines'
jadmanski10646442008-08-13 14:05:21 +000026
mbligh084bc172008-10-18 14:02:45 +000027CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
28CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
29CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
mbligh084bc172008-10-18 14:02:45 +000030INSTALL_CONTROL_FILE = _control_segment_path('install')
showard45ae8192008-11-05 19:32:53 +000031CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
jadmanski10646442008-08-13 14:05:21 +000032
mbligh084bc172008-10-18 14:02:45 +000033VERIFY_CONTROL_FILE = _control_segment_path('verify')
mbligh084bc172008-10-18 14:02:45 +000034REPAIR_CONTROL_FILE = _control_segment_path('repair')
jadmanski10646442008-08-13 14:05:21 +000035
36
37# load up site-specific code for generating site-specific job data
38try:
39 import site_job
40 get_site_job_data = site_job.get_site_job_data
41 del site_job
42except ImportError:
43 # by default provide a stub that generates no site data
44 def get_site_job_data(job):
45 return {}
46
47
48class base_server_job(object):
49 """The actual job against which we do everything.
50
51 Properties:
52 autodir
53 The top level autotest directory (/usr/local/autotest).
54 serverdir
55 <autodir>/server/
56 clientdir
57 <autodir>/client/
58 conmuxdir
59 <autodir>/conmux/
60 testdir
61 <autodir>/server/tests/
62 site_testdir
63 <autodir>/server/site_tests/
64 control
65 the control file for this job
66 """
67
68 STATUS_VERSION = 1
69
70
71 def __init__(self, control, args, resultdir, label, user, machines,
72 client=False, parse_job='',
73 ssh_user='root', ssh_port=22, ssh_pass=''):
74 """
75 control
76 The control file (pathname of)
77 args
78 args to pass to the control file
79 resultdir
80 where to throw the results
81 label
82 label for the job
83 user
84 Username for the job (email address)
85 client
86 True if a client-side control file
87 """
88 path = os.path.dirname(__file__)
89 self.autodir = os.path.abspath(os.path.join(path, '..'))
90 self.serverdir = os.path.join(self.autodir, 'server')
91 self.testdir = os.path.join(self.serverdir, 'tests')
92 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
93 self.tmpdir = os.path.join(self.serverdir, 'tmp')
94 self.conmuxdir = os.path.join(self.autodir, 'conmux')
95 self.clientdir = os.path.join(self.autodir, 'client')
96 self.toolsdir = os.path.join(self.autodir, 'client/tools')
97 if control:
98 self.control = open(control, 'r').read()
99 self.control = re.sub('\r', '', self.control)
100 else:
showard45ae8192008-11-05 19:32:53 +0000101 self.control = ''
jadmanski10646442008-08-13 14:05:21 +0000102 self.resultdir = resultdir
103 if not os.path.exists(resultdir):
104 os.mkdir(resultdir)
105 self.debugdir = os.path.join(resultdir, 'debug')
106 if not os.path.exists(self.debugdir):
107 os.mkdir(self.debugdir)
108 self.status = os.path.join(resultdir, 'status')
109 self.label = label
110 self.user = user
111 self.args = args
112 self.machines = machines
113 self.client = client
114 self.record_prefix = ''
115 self.warning_loggers = set()
116 self.ssh_user = ssh_user
117 self.ssh_port = ssh_port
118 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000119 self.run_test_cleanup = True
mbligh09108442008-10-15 16:27:38 +0000120 self.last_boot_tag = None
jadmanski10646442008-08-13 14:05:21 +0000121
122 self.stdout = fd_stack.fd_stack(1, sys.stdout)
123 self.stderr = fd_stack.fd_stack(2, sys.stderr)
124
jadmanskic09fc152008-10-15 17:56:59 +0000125 self.sysinfo = sysinfo.sysinfo(self.resultdir)
126
jadmanski025099d2008-09-23 14:13:48 +0000127 if not os.access(self.tmpdir, os.W_OK):
128 try:
129 os.makedirs(self.tmpdir, 0700)
130 except os.error, e:
131 # Thrown if the directory already exists, which it may.
132 pass
133
134 if (not os.access(self.tmpdir, os.W_OK) or
135 not os.path.isdir(self.tmpdir)):
136 self.tmpdir = os.path.join(tempfile.gettempdir(),
137 'autotest-' + getpass.getuser())
138 try:
139 os.makedirs(self.tmpdir, 0700)
140 except os.error, e:
141 # Thrown if the directory already exists, which it may.
142 # If the problem was something other than the
143 # directory already existing, this chmod should throw as well
144 # exception.
145 os.chmod(self.tmpdir, stat.S_IRWXU)
146
jadmanski10646442008-08-13 14:05:21 +0000147 if os.path.exists(self.status):
148 os.unlink(self.status)
149 job_data = {'label' : label, 'user' : user,
150 'hostname' : ','.join(machines),
151 'status_version' : str(self.STATUS_VERSION)}
152 job_data.update(get_site_job_data(self))
153 utils.write_keyval(self.resultdir, job_data)
154
155 self.parse_job = parse_job
156 if self.parse_job and len(machines) == 1:
157 self.using_parser = True
158 self.init_parser(resultdir)
159 else:
160 self.using_parser = False
161 self.pkgmgr = packages.PackageManager(
162 self.autodir, run_function_dargs={'timeout':600})
163 self.pkgdir = os.path.join(self.autodir, 'packages')
164
showard21baa452008-10-21 00:08:39 +0000165 self.num_tests_run = 0
166 self.num_tests_failed = 0
167
jadmanski10646442008-08-13 14:05:21 +0000168
169 def init_parser(self, resultdir):
170 """Start the continuous parsing of resultdir. This sets up
171 the database connection and inserts the basic job object into
172 the database if necessary."""
173 # redirect parser debugging to .parse.log
174 parse_log = os.path.join(resultdir, '.parse.log')
175 parse_log = open(parse_log, 'w', 0)
176 tko_utils.redirect_parser_debugging(parse_log)
177 # create a job model object and set up the db
178 self.results_db = tko_db.db(autocommit=True)
179 self.parser = status_lib.parser(self.STATUS_VERSION)
180 self.job_model = self.parser.make_job(resultdir)
181 self.parser.start(self.job_model)
182 # check if a job already exists in the db and insert it if
183 # it does not
184 job_idx = self.results_db.find_job(self.parse_job)
185 if job_idx is None:
186 self.results_db.insert_job(self.parse_job,
187 self.job_model)
188 else:
189 machine_idx = self.results_db.lookup_machine(
190 self.job_model.machine)
191 self.job_model.index = job_idx
192 self.job_model.machine_idx = machine_idx
193
194
195 def cleanup_parser(self):
196 """This should be called after the server job is finished
197 to carry out any remaining cleanup (e.g. flushing any
198 remaining test results to the results db)"""
199 if not self.using_parser:
200 return
201 final_tests = self.parser.end()
202 for test in final_tests:
203 self.__insert_test(test)
204 self.using_parser = False
205
206
207 def verify(self):
208 if not self.machines:
mbligh084bc172008-10-18 14:02:45 +0000209 raise error.AutoservError('No machines specified to verify')
jadmanski10646442008-08-13 14:05:21 +0000210 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000211 namespace = {'machines' : self.machines, 'job' : self,
212 'ssh_user' : self.ssh_user,
213 'ssh_port' : self.ssh_port,
214 'ssh_pass' : self.ssh_pass}
mbligh084bc172008-10-18 14:02:45 +0000215 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000216 except Exception, e:
217 msg = ('Verify failed\n' + str(e) + '\n'
218 + traceback.format_exc())
219 self.record('ABORT', None, None, msg)
220 raise
221
222
223 def repair(self, host_protection):
224 if not self.machines:
225 raise error.AutoservError('No machines specified to repair')
226 namespace = {'machines': self.machines, 'job': self,
227 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
228 'ssh_pass': self.ssh_pass,
229 'protection_level': host_protection}
230 # no matter what happens during repair, go on to try to reverify
231 try:
mbligh084bc172008-10-18 14:02:45 +0000232 self._execute_code(REPAIR_CONTROL_FILE, namespace,
233 protect=False)
jadmanski10646442008-08-13 14:05:21 +0000234 except Exception, exc:
235 print 'Exception occured during repair'
236 traceback.print_exc()
237 self.verify()
238
239
240 def precheck(self):
241 """
242 perform any additional checks in derived classes.
243 """
244 pass
245
246
247 def enable_external_logging(self):
248 """Start or restart external logging mechanism.
249 """
250 pass
251
252
253 def disable_external_logging(self):
254 """ Pause or stop external logging mechanism.
255 """
256 pass
257
258
jadmanski23afbec2008-09-17 18:12:07 +0000259 def enable_test_cleanup(self):
260 """ By default tests run test.cleanup """
261 self.run_test_cleanup = True
262
263
264 def disable_test_cleanup(self):
265 """ By default tests do not run test.cleanup """
266 self.run_test_cleanup = False
267
268
jadmanski10646442008-08-13 14:05:21 +0000269 def use_external_logging(self):
270 """Return True if external logging should be used.
271 """
272 return False
273
274
275 def parallel_simple(self, function, machines, log=True, timeout=None):
276 """Run 'function' using parallel_simple, with an extra
277 wrapper to handle the necessary setup for continuous parsing,
278 if possible. If continuous parsing is already properly
279 initialized then this should just work."""
280 is_forking = not (len(machines) == 1 and
281 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000282 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000283 def wrapper(machine):
284 self.parse_job += "/" + machine
285 self.using_parser = True
286 self.machines = [machine]
287 self.resultdir = os.path.join(self.resultdir,
288 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000289 os.chdir(self.resultdir)
showard2bab8f42008-11-12 18:15:22 +0000290 utils.write_keyval(self.resultdir, {"hostname": machine})
jadmanski10646442008-08-13 14:05:21 +0000291 self.init_parser(self.resultdir)
292 result = function(machine)
293 self.cleanup_parser()
294 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000295 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000296 def wrapper(machine):
297 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000298 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000299 result = function(machine)
300 return result
301 else:
302 wrapper = function
303 subcommand.parallel_simple(wrapper, machines, log, timeout)
304
305
showard45ae8192008-11-05 19:32:53 +0000306 def run(self, cleanup = False, install_before = False,
jadmanski10646442008-08-13 14:05:21 +0000307 install_after = False, collect_crashdumps = True,
308 namespace = {}):
309 # use a copy so changes don't affect the original dictionary
310 namespace = namespace.copy()
311 machines = self.machines
312
313 self.aborted = False
314 namespace['machines'] = machines
315 namespace['args'] = self.args
316 namespace['job'] = self
317 namespace['ssh_user'] = self.ssh_user
318 namespace['ssh_port'] = self.ssh_port
319 namespace['ssh_pass'] = self.ssh_pass
320 test_start_time = int(time.time())
321
322 os.chdir(self.resultdir)
323
324 self.enable_external_logging()
325 status_log = os.path.join(self.resultdir, 'status.log')
jadmanskicdd0c402008-09-19 21:21:31 +0000326 collect_crashinfo = True
jadmanski10646442008-08-13 14:05:21 +0000327 try:
328 if install_before and machines:
mbligh084bc172008-10-18 14:02:45 +0000329 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000330 if self.client:
331 namespace['control'] = self.control
mbligh084bc172008-10-18 14:02:45 +0000332 utils.open_write_close(CLIENT_CONTROL_FILENAME, self.control)
333 shutil.copy(CLIENT_WRAPPER_CONTROL_FILE,
334 SERVER_CONTROL_FILENAME)
jadmanski10646442008-08-13 14:05:21 +0000335 else:
mbligh084bc172008-10-18 14:02:45 +0000336 utils.open_write_close(SERVER_CONTROL_FILENAME, self.control)
337 self._execute_code(SERVER_CONTROL_FILENAME, namespace)
jadmanski10646442008-08-13 14:05:21 +0000338
jadmanskicdd0c402008-09-19 21:21:31 +0000339 # disable crashinfo collection if we get this far without error
340 collect_crashinfo = False
jadmanski10646442008-08-13 14:05:21 +0000341 finally:
jadmanskicdd0c402008-09-19 21:21:31 +0000342 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000343 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000344 if collect_crashinfo:
mbligh084bc172008-10-18 14:02:45 +0000345 # includes crashdumps
346 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
jadmanskicdd0c402008-09-19 21:21:31 +0000347 else:
mbligh084bc172008-10-18 14:02:45 +0000348 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000349 self.disable_external_logging()
showard45ae8192008-11-05 19:32:53 +0000350 if cleanup and machines:
351 self._execute_code(CLEANUP_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000352 if install_after and machines:
mbligh084bc172008-10-18 14:02:45 +0000353 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000354
355
356 def run_test(self, url, *args, **dargs):
357 """Summon a test object and run it.
358
359 tag
360 tag to add to testname
361 url
362 url of the test to run
363 """
364
365 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000366
367 tag = dargs.pop('tag', None)
368 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000369 testname += '.' + tag
370 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000371
372 outputdir = os.path.join(self.resultdir, subdir)
373 if os.path.exists(outputdir):
374 msg = ("%s already exists, test <%s> may have"
375 " already run with tag <%s>"
376 % (outputdir, testname, tag) )
377 raise error.TestError(msg)
378 os.mkdir(outputdir)
379
380 def group_func():
381 try:
382 test.runtest(self, url, tag, args, dargs)
383 except error.TestBaseException, e:
384 self.record(e.exit_status, subdir, testname, str(e))
385 raise
386 except Exception, e:
387 info = str(e) + "\n" + traceback.format_exc()
388 self.record('FAIL', subdir, testname, info)
389 raise
390 else:
391 self.record('GOOD', subdir, testname,
392 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000393
394 result, exc_info = self._run_group(testname, subdir, group_func)
395 if exc_info and isinstance(exc_info[1], error.TestBaseException):
396 return False
397 elif exc_info:
398 raise exc_info[0], exc_info[1], exc_info[2]
399 else:
400 return True
jadmanski10646442008-08-13 14:05:21 +0000401
402
403 def _run_group(self, name, subdir, function, *args, **dargs):
404 """\
405 Underlying method for running something inside of a group.
406 """
jadmanskide292df2008-08-26 20:51:14 +0000407 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000408 old_record_prefix = self.record_prefix
409 try:
410 self.record('START', subdir, name)
411 self.record_prefix += '\t'
412 try:
413 result = function(*args, **dargs)
414 finally:
415 self.record_prefix = old_record_prefix
416 except error.TestBaseException, e:
417 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000418 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000419 except Exception, e:
420 err_msg = str(e) + '\n'
421 err_msg += traceback.format_exc()
422 self.record('END ABORT', subdir, name, err_msg)
423 raise error.JobError(name + ' failed\n' + traceback.format_exc())
424 else:
425 self.record('END GOOD', subdir, name)
426
jadmanskide292df2008-08-26 20:51:14 +0000427 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000428
429
430 def run_group(self, function, *args, **dargs):
431 """\
432 function:
433 subroutine to run
434 *args:
435 arguments for the function
436 """
437
438 name = function.__name__
439
440 # Allow the tag for the group to be specified.
441 tag = dargs.pop('tag', None)
442 if tag:
443 name = tag
444
jadmanskide292df2008-08-26 20:51:14 +0000445 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000446
447
448 def run_reboot(self, reboot_func, get_kernel_func):
449 """\
450 A specialization of run_group meant specifically for handling
451 a reboot. Includes support for capturing the kernel version
452 after the reboot.
453
454 reboot_func: a function that carries out the reboot
455
456 get_kernel_func: a function that returns a string
457 representing the kernel version.
458 """
459
460 old_record_prefix = self.record_prefix
461 try:
462 self.record('START', None, 'reboot')
463 self.record_prefix += '\t'
464 reboot_func()
465 except Exception, e:
466 self.record_prefix = old_record_prefix
467 err_msg = str(e) + '\n' + traceback.format_exc()
468 self.record('END FAIL', None, 'reboot', err_msg)
469 else:
470 kernel = get_kernel_func()
471 self.record_prefix = old_record_prefix
472 self.record('END GOOD', None, 'reboot',
473 optional_fields={"kernel": kernel})
474
475
jadmanskic09fc152008-10-15 17:56:59 +0000476 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
477 self._add_sysinfo_loggable(sysinfo.command(command, logfile),
478 on_every_test)
479
480
481 def add_sysinfo_logfile(self, file, on_every_test=False):
482 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
483
484
485 def _add_sysinfo_loggable(self, loggable, on_every_test):
486 if on_every_test:
487 self.sysinfo.test_loggables.add(loggable)
488 else:
489 self.sysinfo.boot_loggables.add(loggable)
490
491
jadmanski10646442008-08-13 14:05:21 +0000492 def record(self, status_code, subdir, operation, status='',
493 optional_fields=None):
494 """
495 Record job-level status
496
497 The intent is to make this file both machine parseable and
498 human readable. That involves a little more complexity, but
499 really isn't all that bad ;-)
500
501 Format is <status code>\t<subdir>\t<operation>\t<status>
502
mbligh1b3b3762008-09-25 02:46:34 +0000503 status code: see common_lib.log.is_valid_status()
jadmanski10646442008-08-13 14:05:21 +0000504 for valid status definition
505
506 subdir: MUST be a relevant subdirectory in the results,
507 or None, which will be represented as '----'
508
509 operation: description of what you ran (e.g. "dbench", or
510 "mkfs -t foobar /dev/sda9")
511
512 status: error message or "completed sucessfully"
513
514 ------------------------------------------------------------
515
516 Initial tabs indicate indent levels for grouping, and is
517 governed by self.record_prefix
518
519 multiline messages have secondary lines prefaced by a double
520 space (' ')
521
522 Executing this method will trigger the logging of all new
523 warnings to date from the various console loggers.
524 """
525 # poll all our warning loggers for new warnings
526 warnings = self._read_warnings()
527 for timestamp, msg in warnings:
528 self._record("WARN", None, None, msg, timestamp)
529
530 # write out the actual status log line
531 self._record(status_code, subdir, operation, status,
532 optional_fields=optional_fields)
533
534
535 def _read_warnings(self):
536 warnings = []
537 while True:
538 # pull in a line of output from every logger that has
539 # output ready to be read
540 loggers, _, _ = select.select(self.warning_loggers,
541 [], [], 0)
542 closed_loggers = set()
543 for logger in loggers:
544 line = logger.readline()
545 # record any broken pipes (aka line == empty)
546 if len(line) == 0:
547 closed_loggers.add(logger)
548 continue
549 timestamp, msg = line.split('\t', 1)
550 warnings.append((int(timestamp), msg.strip()))
551
552 # stop listening to loggers that are closed
553 self.warning_loggers -= closed_loggers
554
555 # stop if none of the loggers have any output left
556 if not loggers:
557 break
558
559 # sort into timestamp order
560 warnings.sort()
561 return warnings
562
563
564 def _render_record(self, status_code, subdir, operation, status='',
565 epoch_time=None, record_prefix=None,
566 optional_fields=None):
567 """
568 Internal Function to generate a record to be written into a
569 status log. For use by server_job.* classes only.
570 """
571 if subdir:
572 if re.match(r'[\n\t]', subdir):
573 raise ValueError(
574 'Invalid character in subdir string')
575 substr = subdir
576 else:
577 substr = '----'
578
mbligh1b3b3762008-09-25 02:46:34 +0000579 if not log.is_valid_status(status_code):
jadmanski10646442008-08-13 14:05:21 +0000580 raise ValueError('Invalid status code supplied: %s' %
581 status_code)
582 if not operation:
583 operation = '----'
584 if re.match(r'[\n\t]', operation):
585 raise ValueError(
586 'Invalid character in operation string')
587 operation = operation.rstrip()
588 status = status.rstrip()
589 status = re.sub(r"\t", " ", status)
590 # Ensure any continuation lines are marked so we can
591 # detect them in the status file to ensure it is parsable.
592 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
593
594 if not optional_fields:
595 optional_fields = {}
596
597 # Generate timestamps for inclusion in the logs
598 if epoch_time is None:
599 epoch_time = int(time.time())
600 local_time = time.localtime(epoch_time)
601 optional_fields["timestamp"] = str(epoch_time)
602 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
603 local_time)
604
605 fields = [status_code, substr, operation]
606 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
607 fields.append(status)
608
609 if record_prefix is None:
610 record_prefix = self.record_prefix
611
612 msg = '\t'.join(str(x) for x in fields)
613
614 return record_prefix + msg + '\n'
615
616
617 def _record_prerendered(self, msg):
618 """
619 Record a pre-rendered msg into the status logs. The only
620 change this makes to the message is to add on the local
621 indentation. Should not be called outside of server_job.*
622 classes. Unlike _record, this does not write the message
623 to standard output.
624 """
625 lines = []
626 status_file = os.path.join(self.resultdir, 'status.log')
627 status_log = open(status_file, 'a')
628 for line in msg.splitlines():
629 line = self.record_prefix + line + '\n'
630 lines.append(line)
631 status_log.write(line)
632 status_log.close()
633 self.__parse_status(lines)
634
635
mbligh084bc172008-10-18 14:02:45 +0000636 def _fill_server_control_namespace(self, namespace, protect=True):
637 """Prepare a namespace to be used when executing server control files.
638
639 This sets up the control file API by importing modules and making them
640 available under the appropriate names within namespace.
641
642 For use by _execute_code().
643
644 Args:
645 namespace: The namespace dictionary to fill in.
646 protect: Boolean. If True (the default) any operation that would
647 clobber an existing entry in namespace will cause an error.
648 Raises:
649 error.AutoservError: When a name would be clobbered by import.
650 """
651 def _import_names(module_name, names=()):
652 """Import a module and assign named attributes into namespace.
653
654 Args:
655 module_name: The string module name.
656 names: A limiting list of names to import from module_name. If
657 empty (the default), all names are imported from the module
658 similar to a "from foo.bar import *" statement.
659 Raises:
660 error.AutoservError: When a name being imported would clobber
661 a name already in namespace.
662 """
663 module = __import__(module_name, {}, {}, names)
664
665 # No names supplied? Import * from the lowest level module.
666 # (Ugh, why do I have to implement this part myself?)
667 if not names:
668 for submodule_name in module_name.split('.')[1:]:
669 module = getattr(module, submodule_name)
670 if hasattr(module, '__all__'):
671 names = getattr(module, '__all__')
672 else:
673 names = dir(module)
674
675 # Install each name into namespace, checking to make sure it
676 # doesn't override anything that already exists.
677 for name in names:
678 # Check for conflicts to help prevent future problems.
679 if name in namespace and protect:
680 if namespace[name] is not getattr(module, name):
681 raise error.AutoservError('importing name '
682 '%s from %s %r would override %r' %
683 (name, module_name, getattr(module, name),
684 namespace[name]))
685 else:
686 # Encourage cleanliness and the use of __all__ for a
687 # more concrete API with less surprises on '*' imports.
688 warnings.warn('%s (%r) being imported from %s for use '
689 'in server control files is not the '
690 'first occurrance of that import.' %
691 (name, namespace[name], module_name))
692
693 namespace[name] = getattr(module, name)
694
695
696 # This is the equivalent of prepending a bunch of import statements to
697 # the front of the control script.
698 namespace.update(os=os, sys=sys)
699 _import_names('autotest_lib.server',
700 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
701 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
702 _import_names('autotest_lib.server.subcommand',
703 ('parallel', 'parallel_simple', 'subcommand'))
704 _import_names('autotest_lib.server.utils',
705 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
706 _import_names('autotest_lib.client.common_lib.error')
707 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
708
709 # Inject ourself as the job object into other classes within the API.
710 # (Yuck, this injection is a gross thing be part of a public API. -gps)
711 #
712 # XXX Base & SiteAutotest do not appear to use .job. Who does?
713 namespace['autotest'].Autotest.job = self
714 # server.hosts.base_classes.Host uses .job.
715 namespace['hosts'].Host.job = self
716
717
718 def _execute_code(self, code_file, namespace, protect=True):
719 """Execute code using a copy of namespace as a server control script.
720
721 Unless protect_namespace is explicitly set to False, the dict will not
722 be modified.
723
724 Args:
725 code_file: The filename of the control file to execute.
726 namespace: A dict containing names to make available during execution.
727 protect: Boolean. If True (the default) a copy of the namespace dict
728 is used during execution to prevent the code from modifying its
729 contents outside of this function. If False the raw dict is
730 passed in and modifications will be allowed.
731 """
732 if protect:
733 namespace = namespace.copy()
734 self._fill_server_control_namespace(namespace, protect=protect)
735 # TODO: Simplify and get rid of the special cases for only 1 machine.
showard3e66e8c2008-10-27 19:20:51 +0000736 if len(self.machines) > 1:
mbligh084bc172008-10-18 14:02:45 +0000737 machines_text = '\n'.join(self.machines) + '\n'
738 # Only rewrite the file if it does not match our machine list.
739 try:
740 machines_f = open(MACHINES_FILENAME, 'r')
741 existing_machines_text = machines_f.read()
742 machines_f.close()
743 except EnvironmentError:
744 existing_machines_text = None
745 if machines_text != existing_machines_text:
746 utils.open_write_close(MACHINES_FILENAME, machines_text)
747 execfile(code_file, namespace, namespace)
jadmanski10646442008-08-13 14:05:21 +0000748
749
750 def _record(self, status_code, subdir, operation, status='',
751 epoch_time=None, optional_fields=None):
752 """
753 Actual function for recording a single line into the status
754 logs. Should never be called directly, only by job.record as
755 this would bypass the console monitor logging.
756 """
757
758 msg = self._render_record(status_code, subdir, operation,
759 status, epoch_time,
760 optional_fields=optional_fields)
761
762
763 status_file = os.path.join(self.resultdir, 'status.log')
764 sys.stdout.write(msg)
765 open(status_file, "a").write(msg)
766 if subdir:
767 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000768 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000769 open(status_file, "a").write(msg)
770 self.__parse_status(msg.splitlines())
771
772
773 def __parse_status(self, new_lines):
774 if not self.using_parser:
775 return
776 new_tests = self.parser.process_lines(new_lines)
777 for test in new_tests:
778 self.__insert_test(test)
779
780
781 def __insert_test(self, test):
782 """ An internal method to insert a new test result into the
783 database. This method will not raise an exception, even if an
784 error occurs during the insert, to avoid failing a test
785 simply because of unexpected database issues."""
showard21baa452008-10-21 00:08:39 +0000786 self.num_tests_run += 1
787 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
788 self.num_tests_failed += 1
jadmanski10646442008-08-13 14:05:21 +0000789 try:
790 self.results_db.insert_test(self.job_model, test)
791 except Exception:
792 msg = ("WARNING: An unexpected error occured while "
793 "inserting test results into the database. "
794 "Ignoring error.\n" + traceback.format_exc())
795 print >> sys.stderr, msg
796
mblighcaa62c22008-04-07 21:51:17 +0000797
jadmanskib6eb2f12008-09-12 16:39:36 +0000798
jadmanskia1f3c202008-09-15 19:17:16 +0000799class log_collector(object):
800 def __init__(self, host, client_tag, results_dir):
801 self.host = host
802 if not client_tag:
803 client_tag = "default"
804 self.client_results_dir = os.path.join(host.get_autodir(), "results",
805 client_tag)
806 self.server_results_dir = results_dir
807
808
jadmanskib6eb2f12008-09-12 16:39:36 +0000809 def collect_client_job_results(self):
810 """ A method that collects all the current results of a running
811 client job into the results dir. By default does nothing as no
812 client job is running, but when running a client job you can override
813 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000814
815 # make an effort to wait for the machine to come up
816 try:
817 self.host.wait_up(timeout=30)
818 except error.AutoservError:
819 # don't worry about any errors, we'll try and
820 # get the results anyway
821 pass
822
823
824 # Copy all dirs in default to results_dir
jadmanskibfb32f82008-10-03 23:13:36 +0000825 try:
826 keyval_path = self._prepare_for_copying_logs()
827 self.host.get_file(self.client_results_dir + '/',
828 self.server_results_dir)
829 self._process_copied_logs(keyval_path)
830 self._postprocess_copied_logs()
831 except Exception:
832 # well, don't stop running just because we couldn't get logs
833 print "Unexpected error copying test result logs, continuing ..."
834 traceback.print_exc(file=sys.stdout)
jadmanskia1f3c202008-09-15 19:17:16 +0000835
836
837 def _prepare_for_copying_logs(self):
838 server_keyval = os.path.join(self.server_results_dir, 'keyval')
839 if not os.path.exists(server_keyval):
840 # Client-side keyval file can be copied directly
841 return
842
843 # Copy client-side keyval to temporary location
844 suffix = '.keyval_%s' % self.host.hostname
845 fd, keyval_path = tempfile.mkstemp(suffix)
846 os.close(fd)
847 try:
848 client_keyval = os.path.join(self.client_results_dir, 'keyval')
849 try:
850 self.host.get_file(client_keyval, keyval_path)
851 finally:
852 # We will squirrel away the client side keyval
853 # away and move it back when we are done
854 remote_temp_dir = self.host.get_tmp_dir()
855 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
856 self.host.run('mv %s %s' % (client_keyval,
857 self.temp_keyval_path))
858 except (error.AutoservRunError, error.AutoservSSHTimeout):
859 print "Prepare for copying logs failed"
860 return keyval_path
861
862
863 def _process_copied_logs(self, keyval_path):
864 if not keyval_path:
865 # Client-side keyval file was copied directly
866 return
867
868 # Append contents of keyval_<host> file to keyval file
869 try:
870 # Read in new and old keyval files
871 new_keyval = utils.read_keyval(keyval_path)
872 old_keyval = utils.read_keyval(self.server_results_dir)
873 # 'Delete' from new keyval entries that are in both
874 tmp_keyval = {}
875 for key, val in new_keyval.iteritems():
876 if key not in old_keyval:
877 tmp_keyval[key] = val
878 # Append new info to keyval file
879 utils.write_keyval(self.server_results_dir, tmp_keyval)
880 # Delete keyval_<host> file
881 os.remove(keyval_path)
882 except IOError:
883 print "Process copied logs failed"
884
885
886 def _postprocess_copied_logs(self):
887 # we can now put our keyval file back
888 client_keyval = os.path.join(self.client_results_dir, 'keyval')
889 try:
890 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
891 except Exception:
892 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000893
894
mbligh57e78662008-06-17 19:53:49 +0000895# a file-like object for catching stderr from an autotest client and
896# extracting status logs from it
897class client_logger(object):
898 """Partial file object to write to both stdout and
899 the status log file. We only implement those methods
900 utils.run() actually calls.
901 """
jadmanskia1f3c202008-09-15 19:17:16 +0000902 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
903 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000904 extract_indent = re.compile(r"^(\t*).*$")
905
jadmanskia1f3c202008-09-15 19:17:16 +0000906 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000907 self.host = host
908 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000909 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000910 self.leftover = ""
911 self.last_line = ""
912 self.logs = {}
913
914
915 def _process_log_dict(self, log_dict):
916 log_list = log_dict.pop("logs", [])
917 for key in sorted(log_dict.iterkeys()):
918 log_list += self._process_log_dict(log_dict.pop(key))
919 return log_list
920
921
922 def _process_logs(self):
923 """Go through the accumulated logs in self.log and print them
924 out to stdout and the status log. Note that this processes
925 logs in an ordering where:
926
927 1) logs to different tags are never interleaved
928 2) logs to x.y come before logs to x.y.z for all z
929 3) logs to x.y come before x.z whenever y < z
930
931 Note that this will in general not be the same as the
932 chronological ordering of the logs. However, if a chronological
933 ordering is desired that one can be reconstructed from the
934 status log by looking at timestamp lines."""
935 log_list = self._process_log_dict(self.logs)
936 for line in log_list:
937 self.job._record_prerendered(line + '\n')
938 if log_list:
939 self.last_line = log_list[-1]
940
941
942 def _process_quoted_line(self, tag, line):
943 """Process a line quoted with an AUTOTEST_STATUS flag. If the
944 tag is blank then we want to push out all the data we've been
945 building up in self.logs, and then the newest line. If the
946 tag is not blank, then push the line into the logs for handling
947 later."""
948 print line
949 if tag == "":
950 self._process_logs()
951 self.job._record_prerendered(line + '\n')
952 self.last_line = line
953 else:
954 tag_parts = [int(x) for x in tag.split(".")]
955 log_dict = self.logs
956 for part in tag_parts:
957 log_dict = log_dict.setdefault(part, {})
958 log_list = log_dict.setdefault("logs", [])
959 log_list.append(line)
960
961
962 def _process_line(self, line):
963 """Write out a line of data to the appropriate stream. Status
964 lines sent by autotest will be prepended with
965 "AUTOTEST_STATUS", and all other lines are ssh error
966 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000967 status_match = self.status_parser.search(line)
968 test_complete_match = self.test_complete_parser.search(line)
969 if status_match:
970 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000971 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000972 elif test_complete_match:
973 fifo_path, = test_complete_match.groups()
974 self.log_collector.collect_client_job_results()
975 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000976 else:
977 print line
978
jadmanski4aeefe12008-06-20 20:04:25 +0000979
980 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000981 # use the indentation of whatever the last log line was
982 indent = self.extract_indent.match(last_line).group(1)
983 # if the last line starts a new group, add an extra indent
984 if last_line.lstrip('\t').startswith("START\t"):
985 indent += '\t'
986 return [self.job._render_record("WARN", None, None, msg,
987 timestamp, indent).rstrip('\n')
988 for timestamp, msg in warnings]
989
990
991 def _process_warnings(self, last_line, log_dict, warnings):
992 if log_dict.keys() in ([], ["logs"]):
993 # there are no sub-jobs, just append the warnings here
994 warnings = self._format_warnings(last_line, warnings)
995 log_list = log_dict.setdefault("logs", [])
996 log_list += warnings
997 for warning in warnings:
998 sys.stdout.write(warning + '\n')
999 else:
1000 # there are sub-jobs, so put the warnings in there
1001 log_list = log_dict.get("logs", [])
1002 if log_list:
1003 last_line = log_list[-1]
1004 for key in sorted(log_dict.iterkeys()):
1005 if key != "logs":
1006 self._process_warnings(last_line,
1007 log_dict[key],
1008 warnings)
1009
1010
1011 def write(self, data):
1012 # first check for any new console warnings
1013 warnings = self.job._read_warnings()
1014 self._process_warnings(self.last_line, self.logs, warnings)
1015 # now process the newest data written out
1016 data = self.leftover + data
1017 lines = data.split("\n")
1018 # process every line but the last one
1019 for line in lines[:-1]:
1020 self._process_line(line)
1021 # save the last line for later processing
1022 # since we may not have the whole line yet
1023 self.leftover = lines[-1]
1024
1025
1026 def flush(self):
1027 sys.stdout.flush()
1028
1029
1030 def close(self):
1031 if self.leftover:
1032 self._process_line(self.leftover)
1033 self._process_logs()
1034 self.flush()
1035
1036
mblighcaa62c22008-04-07 21:51:17 +00001037# site_server_job.py may be non-existant or empty, make sure that an
1038# appropriate site_server_job class is created nevertheless
1039try:
jadmanski0afbb632008-06-06 21:10:57 +00001040 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +00001041except ImportError:
jadmanski10646442008-08-13 14:05:21 +00001042 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +00001043 pass
1044
jadmanski10646442008-08-13 14:05:21 +00001045class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +00001046 pass