blob: 420f975d7312c4797e0fbb6f52ef29d3eb896100 [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
jadmanski025099d2008-09-23 14:13:48 +00009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
mbligh084bc172008-10-18 14:02:45 +000010import shutil, warnings
jadmanskic09fc152008-10-15 17:56:59 +000011from autotest_lib.client.bin import fd_stack, sysinfo
mbligh09108442008-10-15 16:27:38 +000012from autotest_lib.client.common_lib import error, log, utils, packages
jadmanski10646442008-08-13 14:05:21 +000013from autotest_lib.server import test, subcommand
14from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000015
16
mbligh084bc172008-10-18 14:02:45 +000017def _control_segment_path(name):
18 """Get the pathname of the named control segment file."""
jadmanski10646442008-08-13 14:05:21 +000019 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh084bc172008-10-18 14:02:45 +000020 return os.path.join(server_dir, "control_segments", name)
jadmanski10646442008-08-13 14:05:21 +000021
22
mbligh084bc172008-10-18 14:02:45 +000023CLIENT_CONTROL_FILENAME = 'control'
24SERVER_CONTROL_FILENAME = 'control.srv'
25MACHINES_FILENAME = '.machines'
jadmanski10646442008-08-13 14:05:21 +000026
mbligh084bc172008-10-18 14:02:45 +000027CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
28CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
29CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
mbligh084bc172008-10-18 14:02:45 +000030INSTALL_CONTROL_FILE = _control_segment_path('install')
showard45ae8192008-11-05 19:32:53 +000031CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
jadmanski10646442008-08-13 14:05:21 +000032
mbligh084bc172008-10-18 14:02:45 +000033VERIFY_CONTROL_FILE = _control_segment_path('verify')
mbligh084bc172008-10-18 14:02:45 +000034REPAIR_CONTROL_FILE = _control_segment_path('repair')
jadmanski10646442008-08-13 14:05:21 +000035
36
37# load up site-specific code for generating site-specific job data
38try:
39 import site_job
40 get_site_job_data = site_job.get_site_job_data
41 del site_job
42except ImportError:
43 # by default provide a stub that generates no site data
44 def get_site_job_data(job):
45 return {}
46
47
48class base_server_job(object):
49 """The actual job against which we do everything.
50
51 Properties:
52 autodir
53 The top level autotest directory (/usr/local/autotest).
54 serverdir
55 <autodir>/server/
56 clientdir
57 <autodir>/client/
58 conmuxdir
59 <autodir>/conmux/
60 testdir
61 <autodir>/server/tests/
62 site_testdir
63 <autodir>/server/site_tests/
64 control
65 the control file for this job
66 """
67
68 STATUS_VERSION = 1
69
70
71 def __init__(self, control, args, resultdir, label, user, machines,
72 client=False, parse_job='',
73 ssh_user='root', ssh_port=22, ssh_pass=''):
74 """
75 control
76 The control file (pathname of)
77 args
78 args to pass to the control file
79 resultdir
80 where to throw the results
81 label
82 label for the job
83 user
84 Username for the job (email address)
85 client
86 True if a client-side control file
87 """
88 path = os.path.dirname(__file__)
89 self.autodir = os.path.abspath(os.path.join(path, '..'))
90 self.serverdir = os.path.join(self.autodir, 'server')
91 self.testdir = os.path.join(self.serverdir, 'tests')
92 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
93 self.tmpdir = os.path.join(self.serverdir, 'tmp')
94 self.conmuxdir = os.path.join(self.autodir, 'conmux')
95 self.clientdir = os.path.join(self.autodir, 'client')
96 self.toolsdir = os.path.join(self.autodir, 'client/tools')
97 if control:
98 self.control = open(control, 'r').read()
99 self.control = re.sub('\r', '', self.control)
100 else:
showard45ae8192008-11-05 19:32:53 +0000101 self.control = ''
jadmanski10646442008-08-13 14:05:21 +0000102 self.resultdir = resultdir
103 if not os.path.exists(resultdir):
104 os.mkdir(resultdir)
105 self.debugdir = os.path.join(resultdir, 'debug')
106 if not os.path.exists(self.debugdir):
107 os.mkdir(self.debugdir)
108 self.status = os.path.join(resultdir, 'status')
109 self.label = label
110 self.user = user
111 self.args = args
112 self.machines = machines
113 self.client = client
114 self.record_prefix = ''
115 self.warning_loggers = set()
116 self.ssh_user = ssh_user
117 self.ssh_port = ssh_port
118 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000119 self.run_test_cleanup = True
mbligh09108442008-10-15 16:27:38 +0000120 self.last_boot_tag = None
jadmanski10646442008-08-13 14:05:21 +0000121
122 self.stdout = fd_stack.fd_stack(1, sys.stdout)
123 self.stderr = fd_stack.fd_stack(2, sys.stderr)
124
jadmanskic09fc152008-10-15 17:56:59 +0000125 self.sysinfo = sysinfo.sysinfo(self.resultdir)
126
jadmanski025099d2008-09-23 14:13:48 +0000127 if not os.access(self.tmpdir, os.W_OK):
128 try:
129 os.makedirs(self.tmpdir, 0700)
130 except os.error, e:
131 # Thrown if the directory already exists, which it may.
132 pass
133
134 if (not os.access(self.tmpdir, os.W_OK) or
135 not os.path.isdir(self.tmpdir)):
136 self.tmpdir = os.path.join(tempfile.gettempdir(),
137 'autotest-' + getpass.getuser())
138 try:
139 os.makedirs(self.tmpdir, 0700)
140 except os.error, e:
141 # Thrown if the directory already exists, which it may.
142 # If the problem was something other than the
143 # directory already existing, this chmod should throw as well
144 # exception.
145 os.chmod(self.tmpdir, stat.S_IRWXU)
146
jadmanski10646442008-08-13 14:05:21 +0000147 if os.path.exists(self.status):
148 os.unlink(self.status)
149 job_data = {'label' : label, 'user' : user,
150 'hostname' : ','.join(machines),
151 'status_version' : str(self.STATUS_VERSION)}
152 job_data.update(get_site_job_data(self))
153 utils.write_keyval(self.resultdir, job_data)
154
155 self.parse_job = parse_job
156 if self.parse_job and len(machines) == 1:
157 self.using_parser = True
158 self.init_parser(resultdir)
159 else:
160 self.using_parser = False
161 self.pkgmgr = packages.PackageManager(
162 self.autodir, run_function_dargs={'timeout':600})
163 self.pkgdir = os.path.join(self.autodir, 'packages')
164
showard21baa452008-10-21 00:08:39 +0000165 self.num_tests_run = 0
166 self.num_tests_failed = 0
167
jadmanski10646442008-08-13 14:05:21 +0000168
169 def init_parser(self, resultdir):
170 """Start the continuous parsing of resultdir. This sets up
171 the database connection and inserts the basic job object into
172 the database if necessary."""
173 # redirect parser debugging to .parse.log
174 parse_log = os.path.join(resultdir, '.parse.log')
175 parse_log = open(parse_log, 'w', 0)
176 tko_utils.redirect_parser_debugging(parse_log)
177 # create a job model object and set up the db
178 self.results_db = tko_db.db(autocommit=True)
179 self.parser = status_lib.parser(self.STATUS_VERSION)
180 self.job_model = self.parser.make_job(resultdir)
181 self.parser.start(self.job_model)
182 # check if a job already exists in the db and insert it if
183 # it does not
184 job_idx = self.results_db.find_job(self.parse_job)
185 if job_idx is None:
186 self.results_db.insert_job(self.parse_job,
187 self.job_model)
188 else:
189 machine_idx = self.results_db.lookup_machine(
190 self.job_model.machine)
191 self.job_model.index = job_idx
192 self.job_model.machine_idx = machine_idx
193
194
195 def cleanup_parser(self):
196 """This should be called after the server job is finished
197 to carry out any remaining cleanup (e.g. flushing any
198 remaining test results to the results db)"""
199 if not self.using_parser:
200 return
201 final_tests = self.parser.end()
202 for test in final_tests:
203 self.__insert_test(test)
204 self.using_parser = False
205
206
207 def verify(self):
208 if not self.machines:
mbligh084bc172008-10-18 14:02:45 +0000209 raise error.AutoservError('No machines specified to verify')
jadmanski10646442008-08-13 14:05:21 +0000210 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000211 namespace = {'machines' : self.machines, 'job' : self,
212 'ssh_user' : self.ssh_user,
213 'ssh_port' : self.ssh_port,
214 'ssh_pass' : self.ssh_pass}
mbligh084bc172008-10-18 14:02:45 +0000215 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000216 except Exception, e:
217 msg = ('Verify failed\n' + str(e) + '\n'
218 + traceback.format_exc())
219 self.record('ABORT', None, None, msg)
220 raise
221
222
223 def repair(self, host_protection):
224 if not self.machines:
225 raise error.AutoservError('No machines specified to repair')
226 namespace = {'machines': self.machines, 'job': self,
227 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
228 'ssh_pass': self.ssh_pass,
229 'protection_level': host_protection}
230 # no matter what happens during repair, go on to try to reverify
231 try:
mbligh084bc172008-10-18 14:02:45 +0000232 self._execute_code(REPAIR_CONTROL_FILE, namespace,
233 protect=False)
jadmanski10646442008-08-13 14:05:21 +0000234 except Exception, exc:
235 print 'Exception occured during repair'
236 traceback.print_exc()
237 self.verify()
238
239
240 def precheck(self):
241 """
242 perform any additional checks in derived classes.
243 """
244 pass
245
246
247 def enable_external_logging(self):
248 """Start or restart external logging mechanism.
249 """
250 pass
251
252
253 def disable_external_logging(self):
254 """ Pause or stop external logging mechanism.
255 """
256 pass
257
258
jadmanski23afbec2008-09-17 18:12:07 +0000259 def enable_test_cleanup(self):
260 """ By default tests run test.cleanup """
261 self.run_test_cleanup = True
262
263
264 def disable_test_cleanup(self):
265 """ By default tests do not run test.cleanup """
266 self.run_test_cleanup = False
267
268
jadmanski10646442008-08-13 14:05:21 +0000269 def use_external_logging(self):
270 """Return True if external logging should be used.
271 """
272 return False
273
274
275 def parallel_simple(self, function, machines, log=True, timeout=None):
276 """Run 'function' using parallel_simple, with an extra
277 wrapper to handle the necessary setup for continuous parsing,
278 if possible. If continuous parsing is already properly
279 initialized then this should just work."""
280 is_forking = not (len(machines) == 1 and
281 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000282 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000283 def wrapper(machine):
284 self.parse_job += "/" + machine
285 self.using_parser = True
286 self.machines = [machine]
287 self.resultdir = os.path.join(self.resultdir,
288 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000289 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000290 self.init_parser(self.resultdir)
291 result = function(machine)
292 self.cleanup_parser()
293 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000294 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000295 def wrapper(machine):
296 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000297 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000298 result = function(machine)
299 return result
300 else:
301 wrapper = function
302 subcommand.parallel_simple(wrapper, machines, log, timeout)
303
304
showard45ae8192008-11-05 19:32:53 +0000305 def run(self, cleanup = False, install_before = False,
jadmanski10646442008-08-13 14:05:21 +0000306 install_after = False, collect_crashdumps = True,
307 namespace = {}):
308 # use a copy so changes don't affect the original dictionary
309 namespace = namespace.copy()
310 machines = self.machines
311
312 self.aborted = False
313 namespace['machines'] = machines
314 namespace['args'] = self.args
315 namespace['job'] = self
316 namespace['ssh_user'] = self.ssh_user
317 namespace['ssh_port'] = self.ssh_port
318 namespace['ssh_pass'] = self.ssh_pass
319 test_start_time = int(time.time())
320
321 os.chdir(self.resultdir)
322
323 self.enable_external_logging()
324 status_log = os.path.join(self.resultdir, 'status.log')
jadmanskicdd0c402008-09-19 21:21:31 +0000325 collect_crashinfo = True
jadmanski10646442008-08-13 14:05:21 +0000326 try:
327 if install_before and machines:
mbligh084bc172008-10-18 14:02:45 +0000328 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000329 if self.client:
330 namespace['control'] = self.control
mbligh084bc172008-10-18 14:02:45 +0000331 utils.open_write_close(CLIENT_CONTROL_FILENAME, self.control)
332 shutil.copy(CLIENT_WRAPPER_CONTROL_FILE,
333 SERVER_CONTROL_FILENAME)
jadmanski10646442008-08-13 14:05:21 +0000334 else:
mbligh084bc172008-10-18 14:02:45 +0000335 utils.open_write_close(SERVER_CONTROL_FILENAME, self.control)
336 self._execute_code(SERVER_CONTROL_FILENAME, namespace)
jadmanski10646442008-08-13 14:05:21 +0000337
jadmanskicdd0c402008-09-19 21:21:31 +0000338 # disable crashinfo collection if we get this far without error
339 collect_crashinfo = False
jadmanski10646442008-08-13 14:05:21 +0000340 finally:
jadmanskicdd0c402008-09-19 21:21:31 +0000341 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000342 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000343 if collect_crashinfo:
mbligh084bc172008-10-18 14:02:45 +0000344 # includes crashdumps
345 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
jadmanskicdd0c402008-09-19 21:21:31 +0000346 else:
mbligh084bc172008-10-18 14:02:45 +0000347 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000348 self.disable_external_logging()
showard45ae8192008-11-05 19:32:53 +0000349 if cleanup and machines:
350 self._execute_code(CLEANUP_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000351 if install_after and machines:
mbligh084bc172008-10-18 14:02:45 +0000352 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000353
354
355 def run_test(self, url, *args, **dargs):
356 """Summon a test object and run it.
357
358 tag
359 tag to add to testname
360 url
361 url of the test to run
362 """
363
364 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000365
366 tag = dargs.pop('tag', None)
367 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000368 testname += '.' + tag
369 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000370
371 outputdir = os.path.join(self.resultdir, subdir)
372 if os.path.exists(outputdir):
373 msg = ("%s already exists, test <%s> may have"
374 " already run with tag <%s>"
375 % (outputdir, testname, tag) )
376 raise error.TestError(msg)
377 os.mkdir(outputdir)
378
379 def group_func():
380 try:
381 test.runtest(self, url, tag, args, dargs)
382 except error.TestBaseException, e:
383 self.record(e.exit_status, subdir, testname, str(e))
384 raise
385 except Exception, e:
386 info = str(e) + "\n" + traceback.format_exc()
387 self.record('FAIL', subdir, testname, info)
388 raise
389 else:
390 self.record('GOOD', subdir, testname,
391 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000392
393 result, exc_info = self._run_group(testname, subdir, group_func)
394 if exc_info and isinstance(exc_info[1], error.TestBaseException):
395 return False
396 elif exc_info:
397 raise exc_info[0], exc_info[1], exc_info[2]
398 else:
399 return True
jadmanski10646442008-08-13 14:05:21 +0000400
401
402 def _run_group(self, name, subdir, function, *args, **dargs):
403 """\
404 Underlying method for running something inside of a group.
405 """
jadmanskide292df2008-08-26 20:51:14 +0000406 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000407 old_record_prefix = self.record_prefix
408 try:
409 self.record('START', subdir, name)
410 self.record_prefix += '\t'
411 try:
412 result = function(*args, **dargs)
413 finally:
414 self.record_prefix = old_record_prefix
415 except error.TestBaseException, e:
416 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000417 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000418 except Exception, e:
419 err_msg = str(e) + '\n'
420 err_msg += traceback.format_exc()
421 self.record('END ABORT', subdir, name, err_msg)
422 raise error.JobError(name + ' failed\n' + traceback.format_exc())
423 else:
424 self.record('END GOOD', subdir, name)
425
jadmanskide292df2008-08-26 20:51:14 +0000426 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000427
428
429 def run_group(self, function, *args, **dargs):
430 """\
431 function:
432 subroutine to run
433 *args:
434 arguments for the function
435 """
436
437 name = function.__name__
438
439 # Allow the tag for the group to be specified.
440 tag = dargs.pop('tag', None)
441 if tag:
442 name = tag
443
jadmanskide292df2008-08-26 20:51:14 +0000444 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000445
446
447 def run_reboot(self, reboot_func, get_kernel_func):
448 """\
449 A specialization of run_group meant specifically for handling
450 a reboot. Includes support for capturing the kernel version
451 after the reboot.
452
453 reboot_func: a function that carries out the reboot
454
455 get_kernel_func: a function that returns a string
456 representing the kernel version.
457 """
458
459 old_record_prefix = self.record_prefix
460 try:
461 self.record('START', None, 'reboot')
462 self.record_prefix += '\t'
463 reboot_func()
464 except Exception, e:
465 self.record_prefix = old_record_prefix
466 err_msg = str(e) + '\n' + traceback.format_exc()
467 self.record('END FAIL', None, 'reboot', err_msg)
468 else:
469 kernel = get_kernel_func()
470 self.record_prefix = old_record_prefix
471 self.record('END GOOD', None, 'reboot',
472 optional_fields={"kernel": kernel})
473
474
jadmanskic09fc152008-10-15 17:56:59 +0000475 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
476 self._add_sysinfo_loggable(sysinfo.command(command, logfile),
477 on_every_test)
478
479
480 def add_sysinfo_logfile(self, file, on_every_test=False):
481 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
482
483
484 def _add_sysinfo_loggable(self, loggable, on_every_test):
485 if on_every_test:
486 self.sysinfo.test_loggables.add(loggable)
487 else:
488 self.sysinfo.boot_loggables.add(loggable)
489
490
jadmanski10646442008-08-13 14:05:21 +0000491 def record(self, status_code, subdir, operation, status='',
492 optional_fields=None):
493 """
494 Record job-level status
495
496 The intent is to make this file both machine parseable and
497 human readable. That involves a little more complexity, but
498 really isn't all that bad ;-)
499
500 Format is <status code>\t<subdir>\t<operation>\t<status>
501
mbligh1b3b3762008-09-25 02:46:34 +0000502 status code: see common_lib.log.is_valid_status()
jadmanski10646442008-08-13 14:05:21 +0000503 for valid status definition
504
505 subdir: MUST be a relevant subdirectory in the results,
506 or None, which will be represented as '----'
507
508 operation: description of what you ran (e.g. "dbench", or
509 "mkfs -t foobar /dev/sda9")
510
511 status: error message or "completed sucessfully"
512
513 ------------------------------------------------------------
514
515 Initial tabs indicate indent levels for grouping, and is
516 governed by self.record_prefix
517
518 multiline messages have secondary lines prefaced by a double
519 space (' ')
520
521 Executing this method will trigger the logging of all new
522 warnings to date from the various console loggers.
523 """
524 # poll all our warning loggers for new warnings
525 warnings = self._read_warnings()
526 for timestamp, msg in warnings:
527 self._record("WARN", None, None, msg, timestamp)
528
529 # write out the actual status log line
530 self._record(status_code, subdir, operation, status,
531 optional_fields=optional_fields)
532
533
534 def _read_warnings(self):
535 warnings = []
536 while True:
537 # pull in a line of output from every logger that has
538 # output ready to be read
539 loggers, _, _ = select.select(self.warning_loggers,
540 [], [], 0)
541 closed_loggers = set()
542 for logger in loggers:
543 line = logger.readline()
544 # record any broken pipes (aka line == empty)
545 if len(line) == 0:
546 closed_loggers.add(logger)
547 continue
548 timestamp, msg = line.split('\t', 1)
549 warnings.append((int(timestamp), msg.strip()))
550
551 # stop listening to loggers that are closed
552 self.warning_loggers -= closed_loggers
553
554 # stop if none of the loggers have any output left
555 if not loggers:
556 break
557
558 # sort into timestamp order
559 warnings.sort()
560 return warnings
561
562
563 def _render_record(self, status_code, subdir, operation, status='',
564 epoch_time=None, record_prefix=None,
565 optional_fields=None):
566 """
567 Internal Function to generate a record to be written into a
568 status log. For use by server_job.* classes only.
569 """
570 if subdir:
571 if re.match(r'[\n\t]', subdir):
572 raise ValueError(
573 'Invalid character in subdir string')
574 substr = subdir
575 else:
576 substr = '----'
577
mbligh1b3b3762008-09-25 02:46:34 +0000578 if not log.is_valid_status(status_code):
jadmanski10646442008-08-13 14:05:21 +0000579 raise ValueError('Invalid status code supplied: %s' %
580 status_code)
581 if not operation:
582 operation = '----'
583 if re.match(r'[\n\t]', operation):
584 raise ValueError(
585 'Invalid character in operation string')
586 operation = operation.rstrip()
587 status = status.rstrip()
588 status = re.sub(r"\t", " ", status)
589 # Ensure any continuation lines are marked so we can
590 # detect them in the status file to ensure it is parsable.
591 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
592
593 if not optional_fields:
594 optional_fields = {}
595
596 # Generate timestamps for inclusion in the logs
597 if epoch_time is None:
598 epoch_time = int(time.time())
599 local_time = time.localtime(epoch_time)
600 optional_fields["timestamp"] = str(epoch_time)
601 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
602 local_time)
603
604 fields = [status_code, substr, operation]
605 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
606 fields.append(status)
607
608 if record_prefix is None:
609 record_prefix = self.record_prefix
610
611 msg = '\t'.join(str(x) for x in fields)
612
613 return record_prefix + msg + '\n'
614
615
616 def _record_prerendered(self, msg):
617 """
618 Record a pre-rendered msg into the status logs. The only
619 change this makes to the message is to add on the local
620 indentation. Should not be called outside of server_job.*
621 classes. Unlike _record, this does not write the message
622 to standard output.
623 """
624 lines = []
625 status_file = os.path.join(self.resultdir, 'status.log')
626 status_log = open(status_file, 'a')
627 for line in msg.splitlines():
628 line = self.record_prefix + line + '\n'
629 lines.append(line)
630 status_log.write(line)
631 status_log.close()
632 self.__parse_status(lines)
633
634
mbligh084bc172008-10-18 14:02:45 +0000635 def _fill_server_control_namespace(self, namespace, protect=True):
636 """Prepare a namespace to be used when executing server control files.
637
638 This sets up the control file API by importing modules and making them
639 available under the appropriate names within namespace.
640
641 For use by _execute_code().
642
643 Args:
644 namespace: The namespace dictionary to fill in.
645 protect: Boolean. If True (the default) any operation that would
646 clobber an existing entry in namespace will cause an error.
647 Raises:
648 error.AutoservError: When a name would be clobbered by import.
649 """
650 def _import_names(module_name, names=()):
651 """Import a module and assign named attributes into namespace.
652
653 Args:
654 module_name: The string module name.
655 names: A limiting list of names to import from module_name. If
656 empty (the default), all names are imported from the module
657 similar to a "from foo.bar import *" statement.
658 Raises:
659 error.AutoservError: When a name being imported would clobber
660 a name already in namespace.
661 """
662 module = __import__(module_name, {}, {}, names)
663
664 # No names supplied? Import * from the lowest level module.
665 # (Ugh, why do I have to implement this part myself?)
666 if not names:
667 for submodule_name in module_name.split('.')[1:]:
668 module = getattr(module, submodule_name)
669 if hasattr(module, '__all__'):
670 names = getattr(module, '__all__')
671 else:
672 names = dir(module)
673
674 # Install each name into namespace, checking to make sure it
675 # doesn't override anything that already exists.
676 for name in names:
677 # Check for conflicts to help prevent future problems.
678 if name in namespace and protect:
679 if namespace[name] is not getattr(module, name):
680 raise error.AutoservError('importing name '
681 '%s from %s %r would override %r' %
682 (name, module_name, getattr(module, name),
683 namespace[name]))
684 else:
685 # Encourage cleanliness and the use of __all__ for a
686 # more concrete API with less surprises on '*' imports.
687 warnings.warn('%s (%r) being imported from %s for use '
688 'in server control files is not the '
689 'first occurrance of that import.' %
690 (name, namespace[name], module_name))
691
692 namespace[name] = getattr(module, name)
693
694
695 # This is the equivalent of prepending a bunch of import statements to
696 # the front of the control script.
697 namespace.update(os=os, sys=sys)
698 _import_names('autotest_lib.server',
699 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
700 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
701 _import_names('autotest_lib.server.subcommand',
702 ('parallel', 'parallel_simple', 'subcommand'))
703 _import_names('autotest_lib.server.utils',
704 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
705 _import_names('autotest_lib.client.common_lib.error')
706 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
707
708 # Inject ourself as the job object into other classes within the API.
709 # (Yuck, this injection is a gross thing be part of a public API. -gps)
710 #
711 # XXX Base & SiteAutotest do not appear to use .job. Who does?
712 namespace['autotest'].Autotest.job = self
713 # server.hosts.base_classes.Host uses .job.
714 namespace['hosts'].Host.job = self
715
716
717 def _execute_code(self, code_file, namespace, protect=True):
718 """Execute code using a copy of namespace as a server control script.
719
720 Unless protect_namespace is explicitly set to False, the dict will not
721 be modified.
722
723 Args:
724 code_file: The filename of the control file to execute.
725 namespace: A dict containing names to make available during execution.
726 protect: Boolean. If True (the default) a copy of the namespace dict
727 is used during execution to prevent the code from modifying its
728 contents outside of this function. If False the raw dict is
729 passed in and modifications will be allowed.
730 """
731 if protect:
732 namespace = namespace.copy()
733 self._fill_server_control_namespace(namespace, protect=protect)
734 # TODO: Simplify and get rid of the special cases for only 1 machine.
showard3e66e8c2008-10-27 19:20:51 +0000735 if len(self.machines) > 1:
mbligh084bc172008-10-18 14:02:45 +0000736 machines_text = '\n'.join(self.machines) + '\n'
737 # Only rewrite the file if it does not match our machine list.
738 try:
739 machines_f = open(MACHINES_FILENAME, 'r')
740 existing_machines_text = machines_f.read()
741 machines_f.close()
742 except EnvironmentError:
743 existing_machines_text = None
744 if machines_text != existing_machines_text:
745 utils.open_write_close(MACHINES_FILENAME, machines_text)
746 execfile(code_file, namespace, namespace)
jadmanski10646442008-08-13 14:05:21 +0000747
748
749 def _record(self, status_code, subdir, operation, status='',
750 epoch_time=None, optional_fields=None):
751 """
752 Actual function for recording a single line into the status
753 logs. Should never be called directly, only by job.record as
754 this would bypass the console monitor logging.
755 """
756
757 msg = self._render_record(status_code, subdir, operation,
758 status, epoch_time,
759 optional_fields=optional_fields)
760
761
762 status_file = os.path.join(self.resultdir, 'status.log')
763 sys.stdout.write(msg)
764 open(status_file, "a").write(msg)
765 if subdir:
766 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000767 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000768 open(status_file, "a").write(msg)
769 self.__parse_status(msg.splitlines())
770
771
772 def __parse_status(self, new_lines):
773 if not self.using_parser:
774 return
775 new_tests = self.parser.process_lines(new_lines)
776 for test in new_tests:
777 self.__insert_test(test)
778
779
780 def __insert_test(self, test):
781 """ An internal method to insert a new test result into the
782 database. This method will not raise an exception, even if an
783 error occurs during the insert, to avoid failing a test
784 simply because of unexpected database issues."""
showard21baa452008-10-21 00:08:39 +0000785 self.num_tests_run += 1
786 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
787 self.num_tests_failed += 1
jadmanski10646442008-08-13 14:05:21 +0000788 try:
789 self.results_db.insert_test(self.job_model, test)
790 except Exception:
791 msg = ("WARNING: An unexpected error occured while "
792 "inserting test results into the database. "
793 "Ignoring error.\n" + traceback.format_exc())
794 print >> sys.stderr, msg
795
mblighcaa62c22008-04-07 21:51:17 +0000796
jadmanskib6eb2f12008-09-12 16:39:36 +0000797
jadmanskia1f3c202008-09-15 19:17:16 +0000798class log_collector(object):
799 def __init__(self, host, client_tag, results_dir):
800 self.host = host
801 if not client_tag:
802 client_tag = "default"
803 self.client_results_dir = os.path.join(host.get_autodir(), "results",
804 client_tag)
805 self.server_results_dir = results_dir
806
807
jadmanskib6eb2f12008-09-12 16:39:36 +0000808 def collect_client_job_results(self):
809 """ A method that collects all the current results of a running
810 client job into the results dir. By default does nothing as no
811 client job is running, but when running a client job you can override
812 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000813
814 # make an effort to wait for the machine to come up
815 try:
816 self.host.wait_up(timeout=30)
817 except error.AutoservError:
818 # don't worry about any errors, we'll try and
819 # get the results anyway
820 pass
821
822
823 # Copy all dirs in default to results_dir
jadmanskibfb32f82008-10-03 23:13:36 +0000824 try:
825 keyval_path = self._prepare_for_copying_logs()
826 self.host.get_file(self.client_results_dir + '/',
827 self.server_results_dir)
828 self._process_copied_logs(keyval_path)
829 self._postprocess_copied_logs()
830 except Exception:
831 # well, don't stop running just because we couldn't get logs
832 print "Unexpected error copying test result logs, continuing ..."
833 traceback.print_exc(file=sys.stdout)
jadmanskia1f3c202008-09-15 19:17:16 +0000834
835
836 def _prepare_for_copying_logs(self):
837 server_keyval = os.path.join(self.server_results_dir, 'keyval')
838 if not os.path.exists(server_keyval):
839 # Client-side keyval file can be copied directly
840 return
841
842 # Copy client-side keyval to temporary location
843 suffix = '.keyval_%s' % self.host.hostname
844 fd, keyval_path = tempfile.mkstemp(suffix)
845 os.close(fd)
846 try:
847 client_keyval = os.path.join(self.client_results_dir, 'keyval')
848 try:
849 self.host.get_file(client_keyval, keyval_path)
850 finally:
851 # We will squirrel away the client side keyval
852 # away and move it back when we are done
853 remote_temp_dir = self.host.get_tmp_dir()
854 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
855 self.host.run('mv %s %s' % (client_keyval,
856 self.temp_keyval_path))
857 except (error.AutoservRunError, error.AutoservSSHTimeout):
858 print "Prepare for copying logs failed"
859 return keyval_path
860
861
862 def _process_copied_logs(self, keyval_path):
863 if not keyval_path:
864 # Client-side keyval file was copied directly
865 return
866
867 # Append contents of keyval_<host> file to keyval file
868 try:
869 # Read in new and old keyval files
870 new_keyval = utils.read_keyval(keyval_path)
871 old_keyval = utils.read_keyval(self.server_results_dir)
872 # 'Delete' from new keyval entries that are in both
873 tmp_keyval = {}
874 for key, val in new_keyval.iteritems():
875 if key not in old_keyval:
876 tmp_keyval[key] = val
877 # Append new info to keyval file
878 utils.write_keyval(self.server_results_dir, tmp_keyval)
879 # Delete keyval_<host> file
880 os.remove(keyval_path)
881 except IOError:
882 print "Process copied logs failed"
883
884
885 def _postprocess_copied_logs(self):
886 # we can now put our keyval file back
887 client_keyval = os.path.join(self.client_results_dir, 'keyval')
888 try:
889 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
890 except Exception:
891 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000892
893
mbligh57e78662008-06-17 19:53:49 +0000894# a file-like object for catching stderr from an autotest client and
895# extracting status logs from it
896class client_logger(object):
897 """Partial file object to write to both stdout and
898 the status log file. We only implement those methods
899 utils.run() actually calls.
900 """
jadmanskia1f3c202008-09-15 19:17:16 +0000901 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
902 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000903 extract_indent = re.compile(r"^(\t*).*$")
904
jadmanskia1f3c202008-09-15 19:17:16 +0000905 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000906 self.host = host
907 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000908 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000909 self.leftover = ""
910 self.last_line = ""
911 self.logs = {}
912
913
914 def _process_log_dict(self, log_dict):
915 log_list = log_dict.pop("logs", [])
916 for key in sorted(log_dict.iterkeys()):
917 log_list += self._process_log_dict(log_dict.pop(key))
918 return log_list
919
920
921 def _process_logs(self):
922 """Go through the accumulated logs in self.log and print them
923 out to stdout and the status log. Note that this processes
924 logs in an ordering where:
925
926 1) logs to different tags are never interleaved
927 2) logs to x.y come before logs to x.y.z for all z
928 3) logs to x.y come before x.z whenever y < z
929
930 Note that this will in general not be the same as the
931 chronological ordering of the logs. However, if a chronological
932 ordering is desired that one can be reconstructed from the
933 status log by looking at timestamp lines."""
934 log_list = self._process_log_dict(self.logs)
935 for line in log_list:
936 self.job._record_prerendered(line + '\n')
937 if log_list:
938 self.last_line = log_list[-1]
939
940
941 def _process_quoted_line(self, tag, line):
942 """Process a line quoted with an AUTOTEST_STATUS flag. If the
943 tag is blank then we want to push out all the data we've been
944 building up in self.logs, and then the newest line. If the
945 tag is not blank, then push the line into the logs for handling
946 later."""
947 print line
948 if tag == "":
949 self._process_logs()
950 self.job._record_prerendered(line + '\n')
951 self.last_line = line
952 else:
953 tag_parts = [int(x) for x in tag.split(".")]
954 log_dict = self.logs
955 for part in tag_parts:
956 log_dict = log_dict.setdefault(part, {})
957 log_list = log_dict.setdefault("logs", [])
958 log_list.append(line)
959
960
961 def _process_line(self, line):
962 """Write out a line of data to the appropriate stream. Status
963 lines sent by autotest will be prepended with
964 "AUTOTEST_STATUS", and all other lines are ssh error
965 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000966 status_match = self.status_parser.search(line)
967 test_complete_match = self.test_complete_parser.search(line)
968 if status_match:
969 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000970 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000971 elif test_complete_match:
972 fifo_path, = test_complete_match.groups()
973 self.log_collector.collect_client_job_results()
974 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000975 else:
976 print line
977
jadmanski4aeefe12008-06-20 20:04:25 +0000978
979 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000980 # use the indentation of whatever the last log line was
981 indent = self.extract_indent.match(last_line).group(1)
982 # if the last line starts a new group, add an extra indent
983 if last_line.lstrip('\t').startswith("START\t"):
984 indent += '\t'
985 return [self.job._render_record("WARN", None, None, msg,
986 timestamp, indent).rstrip('\n')
987 for timestamp, msg in warnings]
988
989
990 def _process_warnings(self, last_line, log_dict, warnings):
991 if log_dict.keys() in ([], ["logs"]):
992 # there are no sub-jobs, just append the warnings here
993 warnings = self._format_warnings(last_line, warnings)
994 log_list = log_dict.setdefault("logs", [])
995 log_list += warnings
996 for warning in warnings:
997 sys.stdout.write(warning + '\n')
998 else:
999 # there are sub-jobs, so put the warnings in there
1000 log_list = log_dict.get("logs", [])
1001 if log_list:
1002 last_line = log_list[-1]
1003 for key in sorted(log_dict.iterkeys()):
1004 if key != "logs":
1005 self._process_warnings(last_line,
1006 log_dict[key],
1007 warnings)
1008
1009
1010 def write(self, data):
1011 # first check for any new console warnings
1012 warnings = self.job._read_warnings()
1013 self._process_warnings(self.last_line, self.logs, warnings)
1014 # now process the newest data written out
1015 data = self.leftover + data
1016 lines = data.split("\n")
1017 # process every line but the last one
1018 for line in lines[:-1]:
1019 self._process_line(line)
1020 # save the last line for later processing
1021 # since we may not have the whole line yet
1022 self.leftover = lines[-1]
1023
1024
1025 def flush(self):
1026 sys.stdout.flush()
1027
1028
1029 def close(self):
1030 if self.leftover:
1031 self._process_line(self.leftover)
1032 self._process_logs()
1033 self.flush()
1034
1035
mblighcaa62c22008-04-07 21:51:17 +00001036# site_server_job.py may be non-existant or empty, make sure that an
1037# appropriate site_server_job class is created nevertheless
1038try:
jadmanski0afbb632008-06-06 21:10:57 +00001039 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +00001040except ImportError:
jadmanski10646442008-08-13 14:05:21 +00001041 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +00001042 pass
1043
jadmanski10646442008-08-13 14:05:21 +00001044class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +00001045 pass