blob: c4258f221d6f1337e08b50286a590e3ed28818fc [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
jadmanski025099d2008-09-23 14:13:48 +00009import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
mbligh084bc172008-10-18 14:02:45 +000010import shutil, warnings
jadmanskic09fc152008-10-15 17:56:59 +000011from autotest_lib.client.bin import fd_stack, sysinfo
mbligh09108442008-10-15 16:27:38 +000012from autotest_lib.client.common_lib import error, log, utils, packages
jadmanski10646442008-08-13 14:05:21 +000013from autotest_lib.server import test, subcommand
14from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski10646442008-08-13 14:05:21 +000015
16
mbligh084bc172008-10-18 14:02:45 +000017def _control_segment_path(name):
18 """Get the pathname of the named control segment file."""
jadmanski10646442008-08-13 14:05:21 +000019 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh084bc172008-10-18 14:02:45 +000020 return os.path.join(server_dir, "control_segments", name)
jadmanski10646442008-08-13 14:05:21 +000021
22
mbligh084bc172008-10-18 14:02:45 +000023CLIENT_CONTROL_FILENAME = 'control'
24SERVER_CONTROL_FILENAME = 'control.srv'
25MACHINES_FILENAME = '.machines'
jadmanski10646442008-08-13 14:05:21 +000026
mbligh084bc172008-10-18 14:02:45 +000027CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
28CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
29CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
30REBOOT_SEGMENT_CONTROL_FILE = _control_segment_path('reboot_segment')
31INSTALL_CONTROL_FILE = _control_segment_path('install')
jadmanski10646442008-08-13 14:05:21 +000032
mbligh084bc172008-10-18 14:02:45 +000033# XXX(gps): Dealing with the site_* stuff here is annoying. The
34# control_segments/verify and control_segments/repair code should be
35# changed to load & run their site equivalents as the first thing they do.
36# Doing that would get need of the annoying 'protect=False' mechanism in
37# the _execute_code() machinery below.
38SITE_VERIFY_CONTROL_FILE = _control_segment_path('site_verify')
39VERIFY_CONTROL_FILE = _control_segment_path('verify')
40SITE_REPAIR_CONTROL_FILE = _control_segment_path('site_repair')
41REPAIR_CONTROL_FILE = _control_segment_path('repair')
jadmanski10646442008-08-13 14:05:21 +000042
43
44# load up site-specific code for generating site-specific job data
45try:
46 import site_job
47 get_site_job_data = site_job.get_site_job_data
48 del site_job
49except ImportError:
50 # by default provide a stub that generates no site data
51 def get_site_job_data(job):
52 return {}
53
54
55class base_server_job(object):
56 """The actual job against which we do everything.
57
58 Properties:
59 autodir
60 The top level autotest directory (/usr/local/autotest).
61 serverdir
62 <autodir>/server/
63 clientdir
64 <autodir>/client/
65 conmuxdir
66 <autodir>/conmux/
67 testdir
68 <autodir>/server/tests/
69 site_testdir
70 <autodir>/server/site_tests/
71 control
72 the control file for this job
73 """
74
75 STATUS_VERSION = 1
76
77
78 def __init__(self, control, args, resultdir, label, user, machines,
79 client=False, parse_job='',
80 ssh_user='root', ssh_port=22, ssh_pass=''):
81 """
82 control
83 The control file (pathname of)
84 args
85 args to pass to the control file
86 resultdir
87 where to throw the results
88 label
89 label for the job
90 user
91 Username for the job (email address)
92 client
93 True if a client-side control file
94 """
95 path = os.path.dirname(__file__)
96 self.autodir = os.path.abspath(os.path.join(path, '..'))
97 self.serverdir = os.path.join(self.autodir, 'server')
98 self.testdir = os.path.join(self.serverdir, 'tests')
99 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
100 self.tmpdir = os.path.join(self.serverdir, 'tmp')
101 self.conmuxdir = os.path.join(self.autodir, 'conmux')
102 self.clientdir = os.path.join(self.autodir, 'client')
103 self.toolsdir = os.path.join(self.autodir, 'client/tools')
104 if control:
105 self.control = open(control, 'r').read()
106 self.control = re.sub('\r', '', self.control)
107 else:
108 self.control = None
109 self.resultdir = resultdir
110 if not os.path.exists(resultdir):
111 os.mkdir(resultdir)
112 self.debugdir = os.path.join(resultdir, 'debug')
113 if not os.path.exists(self.debugdir):
114 os.mkdir(self.debugdir)
115 self.status = os.path.join(resultdir, 'status')
116 self.label = label
117 self.user = user
118 self.args = args
119 self.machines = machines
120 self.client = client
121 self.record_prefix = ''
122 self.warning_loggers = set()
123 self.ssh_user = ssh_user
124 self.ssh_port = ssh_port
125 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000126 self.run_test_cleanup = True
mbligh09108442008-10-15 16:27:38 +0000127 self.last_boot_tag = None
jadmanski10646442008-08-13 14:05:21 +0000128
129 self.stdout = fd_stack.fd_stack(1, sys.stdout)
130 self.stderr = fd_stack.fd_stack(2, sys.stderr)
131
jadmanskic09fc152008-10-15 17:56:59 +0000132 self.sysinfo = sysinfo.sysinfo(self.resultdir)
133
jadmanski025099d2008-09-23 14:13:48 +0000134 if not os.access(self.tmpdir, os.W_OK):
135 try:
136 os.makedirs(self.tmpdir, 0700)
137 except os.error, e:
138 # Thrown if the directory already exists, which it may.
139 pass
140
141 if (not os.access(self.tmpdir, os.W_OK) or
142 not os.path.isdir(self.tmpdir)):
143 self.tmpdir = os.path.join(tempfile.gettempdir(),
144 'autotest-' + getpass.getuser())
145 try:
146 os.makedirs(self.tmpdir, 0700)
147 except os.error, e:
148 # Thrown if the directory already exists, which it may.
149 # If the problem was something other than the
150 # directory already existing, this chmod should throw as well
151 # exception.
152 os.chmod(self.tmpdir, stat.S_IRWXU)
153
jadmanski10646442008-08-13 14:05:21 +0000154 if os.path.exists(self.status):
155 os.unlink(self.status)
156 job_data = {'label' : label, 'user' : user,
157 'hostname' : ','.join(machines),
158 'status_version' : str(self.STATUS_VERSION)}
159 job_data.update(get_site_job_data(self))
160 utils.write_keyval(self.resultdir, job_data)
161
162 self.parse_job = parse_job
163 if self.parse_job and len(machines) == 1:
164 self.using_parser = True
165 self.init_parser(resultdir)
166 else:
167 self.using_parser = False
168 self.pkgmgr = packages.PackageManager(
169 self.autodir, run_function_dargs={'timeout':600})
170 self.pkgdir = os.path.join(self.autodir, 'packages')
171
172
173 def init_parser(self, resultdir):
174 """Start the continuous parsing of resultdir. This sets up
175 the database connection and inserts the basic job object into
176 the database if necessary."""
177 # redirect parser debugging to .parse.log
178 parse_log = os.path.join(resultdir, '.parse.log')
179 parse_log = open(parse_log, 'w', 0)
180 tko_utils.redirect_parser_debugging(parse_log)
181 # create a job model object and set up the db
182 self.results_db = tko_db.db(autocommit=True)
183 self.parser = status_lib.parser(self.STATUS_VERSION)
184 self.job_model = self.parser.make_job(resultdir)
185 self.parser.start(self.job_model)
186 # check if a job already exists in the db and insert it if
187 # it does not
188 job_idx = self.results_db.find_job(self.parse_job)
189 if job_idx is None:
190 self.results_db.insert_job(self.parse_job,
191 self.job_model)
192 else:
193 machine_idx = self.results_db.lookup_machine(
194 self.job_model.machine)
195 self.job_model.index = job_idx
196 self.job_model.machine_idx = machine_idx
197
198
199 def cleanup_parser(self):
200 """This should be called after the server job is finished
201 to carry out any remaining cleanup (e.g. flushing any
202 remaining test results to the results db)"""
203 if not self.using_parser:
204 return
205 final_tests = self.parser.end()
206 for test in final_tests:
207 self.__insert_test(test)
208 self.using_parser = False
209
210
211 def verify(self):
212 if not self.machines:
mbligh084bc172008-10-18 14:02:45 +0000213 raise error.AutoservError('No machines specified to verify')
jadmanski10646442008-08-13 14:05:21 +0000214 try:
jadmanskicdd0c402008-09-19 21:21:31 +0000215 namespace = {'machines' : self.machines, 'job' : self,
216 'ssh_user' : self.ssh_user,
217 'ssh_port' : self.ssh_port,
218 'ssh_pass' : self.ssh_pass}
mbligh084bc172008-10-18 14:02:45 +0000219 # Protection is disabled to allow for site_verify()
220 # to be defined in site_verify but called from its
221 # "JP: deprecated" callsite in control_segments/verify.
222 if os.path.exists(SITE_VERIFY_CONTROL_FILE):
223 self._execute_code(SITE_VERIFY_CONTROL_FILE, namespace,
224 protect=False)
225 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
jadmanski10646442008-08-13 14:05:21 +0000226 except Exception, e:
227 msg = ('Verify failed\n' + str(e) + '\n'
228 + traceback.format_exc())
229 self.record('ABORT', None, None, msg)
230 raise
231
232
233 def repair(self, host_protection):
234 if not self.machines:
235 raise error.AutoservError('No machines specified to repair')
236 namespace = {'machines': self.machines, 'job': self,
237 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
238 'ssh_pass': self.ssh_pass,
239 'protection_level': host_protection}
240 # no matter what happens during repair, go on to try to reverify
241 try:
mbligh084bc172008-10-18 14:02:45 +0000242 # Protection is disabled to allow for site_repair_full() and
243 # site_repair_filesystem_only() to be defined in site_repair but
244 # called from the "JP: deprecated" areas of control_segments/repair.
245 if os.path.exists(SITE_REPAIR_CONTROL_FILE):
246 self._execute_code(SITE_REPAIR_CONTROL_FILE, namespace,
247 protect=False)
248 self._execute_code(REPAIR_CONTROL_FILE, namespace,
249 protect=False)
jadmanski10646442008-08-13 14:05:21 +0000250 except Exception, exc:
251 print 'Exception occured during repair'
252 traceback.print_exc()
253 self.verify()
254
255
256 def precheck(self):
257 """
258 perform any additional checks in derived classes.
259 """
260 pass
261
262
263 def enable_external_logging(self):
264 """Start or restart external logging mechanism.
265 """
266 pass
267
268
269 def disable_external_logging(self):
270 """ Pause or stop external logging mechanism.
271 """
272 pass
273
274
jadmanski23afbec2008-09-17 18:12:07 +0000275 def enable_test_cleanup(self):
276 """ By default tests run test.cleanup """
277 self.run_test_cleanup = True
278
279
280 def disable_test_cleanup(self):
281 """ By default tests do not run test.cleanup """
282 self.run_test_cleanup = False
283
284
jadmanski10646442008-08-13 14:05:21 +0000285 def use_external_logging(self):
286 """Return True if external logging should be used.
287 """
288 return False
289
290
291 def parallel_simple(self, function, machines, log=True, timeout=None):
292 """Run 'function' using parallel_simple, with an extra
293 wrapper to handle the necessary setup for continuous parsing,
294 if possible. If continuous parsing is already properly
295 initialized then this should just work."""
296 is_forking = not (len(machines) == 1 and
297 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000298 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000299 def wrapper(machine):
300 self.parse_job += "/" + machine
301 self.using_parser = True
302 self.machines = [machine]
303 self.resultdir = os.path.join(self.resultdir,
304 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000305 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000306 self.init_parser(self.resultdir)
307 result = function(machine)
308 self.cleanup_parser()
309 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000310 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000311 def wrapper(machine):
312 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000313 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000314 result = function(machine)
315 return result
316 else:
317 wrapper = function
318 subcommand.parallel_simple(wrapper, machines, log, timeout)
319
320
321 def run(self, reboot = False, install_before = False,
322 install_after = False, collect_crashdumps = True,
323 namespace = {}):
324 # use a copy so changes don't affect the original dictionary
325 namespace = namespace.copy()
326 machines = self.machines
327
328 self.aborted = False
329 namespace['machines'] = machines
330 namespace['args'] = self.args
331 namespace['job'] = self
332 namespace['ssh_user'] = self.ssh_user
333 namespace['ssh_port'] = self.ssh_port
334 namespace['ssh_pass'] = self.ssh_pass
335 test_start_time = int(time.time())
336
337 os.chdir(self.resultdir)
338
339 self.enable_external_logging()
340 status_log = os.path.join(self.resultdir, 'status.log')
jadmanskicdd0c402008-09-19 21:21:31 +0000341 collect_crashinfo = True
jadmanski10646442008-08-13 14:05:21 +0000342 try:
343 if install_before and machines:
mbligh084bc172008-10-18 14:02:45 +0000344 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000345 if self.client:
346 namespace['control'] = self.control
mbligh084bc172008-10-18 14:02:45 +0000347 utils.open_write_close(CLIENT_CONTROL_FILENAME, self.control)
348 shutil.copy(CLIENT_WRAPPER_CONTROL_FILE,
349 SERVER_CONTROL_FILENAME)
jadmanski10646442008-08-13 14:05:21 +0000350 else:
mbligh084bc172008-10-18 14:02:45 +0000351 utils.open_write_close(SERVER_CONTROL_FILENAME, self.control)
352 self._execute_code(SERVER_CONTROL_FILENAME, namespace)
jadmanski10646442008-08-13 14:05:21 +0000353
jadmanskicdd0c402008-09-19 21:21:31 +0000354 # disable crashinfo collection if we get this far without error
355 collect_crashinfo = False
jadmanski10646442008-08-13 14:05:21 +0000356 finally:
jadmanskicdd0c402008-09-19 21:21:31 +0000357 if machines and (collect_crashdumps or collect_crashinfo):
jadmanski10646442008-08-13 14:05:21 +0000358 namespace['test_start_time'] = test_start_time
jadmanskicdd0c402008-09-19 21:21:31 +0000359 if collect_crashinfo:
mbligh084bc172008-10-18 14:02:45 +0000360 # includes crashdumps
361 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
jadmanskicdd0c402008-09-19 21:21:31 +0000362 else:
mbligh084bc172008-10-18 14:02:45 +0000363 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000364 self.disable_external_logging()
365 if reboot and machines:
mbligh084bc172008-10-18 14:02:45 +0000366 self._execute_code(REBOOT_SEGMENT_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000367 if install_after and machines:
mbligh084bc172008-10-18 14:02:45 +0000368 self._execute_code(INSTALL_CONTROL_FILE, namespace)
jadmanski10646442008-08-13 14:05:21 +0000369
370
371 def run_test(self, url, *args, **dargs):
372 """Summon a test object and run it.
373
374 tag
375 tag to add to testname
376 url
377 url of the test to run
378 """
379
380 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000381
382 tag = dargs.pop('tag', None)
383 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000384 testname += '.' + tag
385 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000386
387 outputdir = os.path.join(self.resultdir, subdir)
388 if os.path.exists(outputdir):
389 msg = ("%s already exists, test <%s> may have"
390 " already run with tag <%s>"
391 % (outputdir, testname, tag) )
392 raise error.TestError(msg)
393 os.mkdir(outputdir)
394
395 def group_func():
396 try:
397 test.runtest(self, url, tag, args, dargs)
398 except error.TestBaseException, e:
399 self.record(e.exit_status, subdir, testname, str(e))
400 raise
401 except Exception, e:
402 info = str(e) + "\n" + traceback.format_exc()
403 self.record('FAIL', subdir, testname, info)
404 raise
405 else:
406 self.record('GOOD', subdir, testname,
407 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000408
409 result, exc_info = self._run_group(testname, subdir, group_func)
410 if exc_info and isinstance(exc_info[1], error.TestBaseException):
411 return False
412 elif exc_info:
413 raise exc_info[0], exc_info[1], exc_info[2]
414 else:
415 return True
jadmanski10646442008-08-13 14:05:21 +0000416
417
418 def _run_group(self, name, subdir, function, *args, **dargs):
419 """\
420 Underlying method for running something inside of a group.
421 """
jadmanskide292df2008-08-26 20:51:14 +0000422 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000423 old_record_prefix = self.record_prefix
424 try:
425 self.record('START', subdir, name)
426 self.record_prefix += '\t'
427 try:
428 result = function(*args, **dargs)
429 finally:
430 self.record_prefix = old_record_prefix
431 except error.TestBaseException, e:
432 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000433 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000434 except Exception, e:
435 err_msg = str(e) + '\n'
436 err_msg += traceback.format_exc()
437 self.record('END ABORT', subdir, name, err_msg)
438 raise error.JobError(name + ' failed\n' + traceback.format_exc())
439 else:
440 self.record('END GOOD', subdir, name)
441
jadmanskide292df2008-08-26 20:51:14 +0000442 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000443
444
445 def run_group(self, function, *args, **dargs):
446 """\
447 function:
448 subroutine to run
449 *args:
450 arguments for the function
451 """
452
453 name = function.__name__
454
455 # Allow the tag for the group to be specified.
456 tag = dargs.pop('tag', None)
457 if tag:
458 name = tag
459
jadmanskide292df2008-08-26 20:51:14 +0000460 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000461
462
463 def run_reboot(self, reboot_func, get_kernel_func):
464 """\
465 A specialization of run_group meant specifically for handling
466 a reboot. Includes support for capturing the kernel version
467 after the reboot.
468
469 reboot_func: a function that carries out the reboot
470
471 get_kernel_func: a function that returns a string
472 representing the kernel version.
473 """
474
475 old_record_prefix = self.record_prefix
476 try:
477 self.record('START', None, 'reboot')
478 self.record_prefix += '\t'
479 reboot_func()
480 except Exception, e:
481 self.record_prefix = old_record_prefix
482 err_msg = str(e) + '\n' + traceback.format_exc()
483 self.record('END FAIL', None, 'reboot', err_msg)
484 else:
485 kernel = get_kernel_func()
486 self.record_prefix = old_record_prefix
487 self.record('END GOOD', None, 'reboot',
488 optional_fields={"kernel": kernel})
489
490
jadmanskic09fc152008-10-15 17:56:59 +0000491 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
492 self._add_sysinfo_loggable(sysinfo.command(command, logfile),
493 on_every_test)
494
495
496 def add_sysinfo_logfile(self, file, on_every_test=False):
497 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
498
499
500 def _add_sysinfo_loggable(self, loggable, on_every_test):
501 if on_every_test:
502 self.sysinfo.test_loggables.add(loggable)
503 else:
504 self.sysinfo.boot_loggables.add(loggable)
505
506
jadmanski10646442008-08-13 14:05:21 +0000507 def record(self, status_code, subdir, operation, status='',
508 optional_fields=None):
509 """
510 Record job-level status
511
512 The intent is to make this file both machine parseable and
513 human readable. That involves a little more complexity, but
514 really isn't all that bad ;-)
515
516 Format is <status code>\t<subdir>\t<operation>\t<status>
517
mbligh1b3b3762008-09-25 02:46:34 +0000518 status code: see common_lib.log.is_valid_status()
jadmanski10646442008-08-13 14:05:21 +0000519 for valid status definition
520
521 subdir: MUST be a relevant subdirectory in the results,
522 or None, which will be represented as '----'
523
524 operation: description of what you ran (e.g. "dbench", or
525 "mkfs -t foobar /dev/sda9")
526
527 status: error message or "completed sucessfully"
528
529 ------------------------------------------------------------
530
531 Initial tabs indicate indent levels for grouping, and is
532 governed by self.record_prefix
533
534 multiline messages have secondary lines prefaced by a double
535 space (' ')
536
537 Executing this method will trigger the logging of all new
538 warnings to date from the various console loggers.
539 """
540 # poll all our warning loggers for new warnings
541 warnings = self._read_warnings()
542 for timestamp, msg in warnings:
543 self._record("WARN", None, None, msg, timestamp)
544
545 # write out the actual status log line
546 self._record(status_code, subdir, operation, status,
547 optional_fields=optional_fields)
548
549
550 def _read_warnings(self):
551 warnings = []
552 while True:
553 # pull in a line of output from every logger that has
554 # output ready to be read
555 loggers, _, _ = select.select(self.warning_loggers,
556 [], [], 0)
557 closed_loggers = set()
558 for logger in loggers:
559 line = logger.readline()
560 # record any broken pipes (aka line == empty)
561 if len(line) == 0:
562 closed_loggers.add(logger)
563 continue
564 timestamp, msg = line.split('\t', 1)
565 warnings.append((int(timestamp), msg.strip()))
566
567 # stop listening to loggers that are closed
568 self.warning_loggers -= closed_loggers
569
570 # stop if none of the loggers have any output left
571 if not loggers:
572 break
573
574 # sort into timestamp order
575 warnings.sort()
576 return warnings
577
578
579 def _render_record(self, status_code, subdir, operation, status='',
580 epoch_time=None, record_prefix=None,
581 optional_fields=None):
582 """
583 Internal Function to generate a record to be written into a
584 status log. For use by server_job.* classes only.
585 """
586 if subdir:
587 if re.match(r'[\n\t]', subdir):
588 raise ValueError(
589 'Invalid character in subdir string')
590 substr = subdir
591 else:
592 substr = '----'
593
mbligh1b3b3762008-09-25 02:46:34 +0000594 if not log.is_valid_status(status_code):
jadmanski10646442008-08-13 14:05:21 +0000595 raise ValueError('Invalid status code supplied: %s' %
596 status_code)
597 if not operation:
598 operation = '----'
599 if re.match(r'[\n\t]', operation):
600 raise ValueError(
601 'Invalid character in operation string')
602 operation = operation.rstrip()
603 status = status.rstrip()
604 status = re.sub(r"\t", " ", status)
605 # Ensure any continuation lines are marked so we can
606 # detect them in the status file to ensure it is parsable.
607 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
608
609 if not optional_fields:
610 optional_fields = {}
611
612 # Generate timestamps for inclusion in the logs
613 if epoch_time is None:
614 epoch_time = int(time.time())
615 local_time = time.localtime(epoch_time)
616 optional_fields["timestamp"] = str(epoch_time)
617 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
618 local_time)
619
620 fields = [status_code, substr, operation]
621 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
622 fields.append(status)
623
624 if record_prefix is None:
625 record_prefix = self.record_prefix
626
627 msg = '\t'.join(str(x) for x in fields)
628
629 return record_prefix + msg + '\n'
630
631
632 def _record_prerendered(self, msg):
633 """
634 Record a pre-rendered msg into the status logs. The only
635 change this makes to the message is to add on the local
636 indentation. Should not be called outside of server_job.*
637 classes. Unlike _record, this does not write the message
638 to standard output.
639 """
640 lines = []
641 status_file = os.path.join(self.resultdir, 'status.log')
642 status_log = open(status_file, 'a')
643 for line in msg.splitlines():
644 line = self.record_prefix + line + '\n'
645 lines.append(line)
646 status_log.write(line)
647 status_log.close()
648 self.__parse_status(lines)
649
650
mbligh084bc172008-10-18 14:02:45 +0000651 def _fill_server_control_namespace(self, namespace, protect=True):
652 """Prepare a namespace to be used when executing server control files.
653
654 This sets up the control file API by importing modules and making them
655 available under the appropriate names within namespace.
656
657 For use by _execute_code().
658
659 Args:
660 namespace: The namespace dictionary to fill in.
661 protect: Boolean. If True (the default) any operation that would
662 clobber an existing entry in namespace will cause an error.
663 Raises:
664 error.AutoservError: When a name would be clobbered by import.
665 """
666 def _import_names(module_name, names=()):
667 """Import a module and assign named attributes into namespace.
668
669 Args:
670 module_name: The string module name.
671 names: A limiting list of names to import from module_name. If
672 empty (the default), all names are imported from the module
673 similar to a "from foo.bar import *" statement.
674 Raises:
675 error.AutoservError: When a name being imported would clobber
676 a name already in namespace.
677 """
678 module = __import__(module_name, {}, {}, names)
679
680 # No names supplied? Import * from the lowest level module.
681 # (Ugh, why do I have to implement this part myself?)
682 if not names:
683 for submodule_name in module_name.split('.')[1:]:
684 module = getattr(module, submodule_name)
685 if hasattr(module, '__all__'):
686 names = getattr(module, '__all__')
687 else:
688 names = dir(module)
689
690 # Install each name into namespace, checking to make sure it
691 # doesn't override anything that already exists.
692 for name in names:
693 # Check for conflicts to help prevent future problems.
694 if name in namespace and protect:
695 if namespace[name] is not getattr(module, name):
696 raise error.AutoservError('importing name '
697 '%s from %s %r would override %r' %
698 (name, module_name, getattr(module, name),
699 namespace[name]))
700 else:
701 # Encourage cleanliness and the use of __all__ for a
702 # more concrete API with less surprises on '*' imports.
703 warnings.warn('%s (%r) being imported from %s for use '
704 'in server control files is not the '
705 'first occurrance of that import.' %
706 (name, namespace[name], module_name))
707
708 namespace[name] = getattr(module, name)
709
710
711 # This is the equivalent of prepending a bunch of import statements to
712 # the front of the control script.
713 namespace.update(os=os, sys=sys)
714 _import_names('autotest_lib.server',
715 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
716 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
717 _import_names('autotest_lib.server.subcommand',
718 ('parallel', 'parallel_simple', 'subcommand'))
719 _import_names('autotest_lib.server.utils',
720 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
721 _import_names('autotest_lib.client.common_lib.error')
722 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
723
724 # Inject ourself as the job object into other classes within the API.
725 # (Yuck, this injection is a gross thing be part of a public API. -gps)
726 #
727 # XXX Base & SiteAutotest do not appear to use .job. Who does?
728 namespace['autotest'].Autotest.job = self
729 # server.hosts.base_classes.Host uses .job.
730 namespace['hosts'].Host.job = self
731
732
733 def _execute_code(self, code_file, namespace, protect=True):
734 """Execute code using a copy of namespace as a server control script.
735
736 Unless protect_namespace is explicitly set to False, the dict will not
737 be modified.
738
739 Args:
740 code_file: The filename of the control file to execute.
741 namespace: A dict containing names to make available during execution.
742 protect: Boolean. If True (the default) a copy of the namespace dict
743 is used during execution to prevent the code from modifying its
744 contents outside of this function. If False the raw dict is
745 passed in and modifications will be allowed.
746 """
747 if protect:
748 namespace = namespace.copy()
749 self._fill_server_control_namespace(namespace, protect=protect)
750 # TODO: Simplify and get rid of the special cases for only 1 machine.
751 if len(self.machines):
752 machines_text = '\n'.join(self.machines) + '\n'
753 # Only rewrite the file if it does not match our machine list.
754 try:
755 machines_f = open(MACHINES_FILENAME, 'r')
756 existing_machines_text = machines_f.read()
757 machines_f.close()
758 except EnvironmentError:
759 existing_machines_text = None
760 if machines_text != existing_machines_text:
761 utils.open_write_close(MACHINES_FILENAME, machines_text)
762 execfile(code_file, namespace, namespace)
jadmanski10646442008-08-13 14:05:21 +0000763
764
765 def _record(self, status_code, subdir, operation, status='',
766 epoch_time=None, optional_fields=None):
767 """
768 Actual function for recording a single line into the status
769 logs. Should never be called directly, only by job.record as
770 this would bypass the console monitor logging.
771 """
772
773 msg = self._render_record(status_code, subdir, operation,
774 status, epoch_time,
775 optional_fields=optional_fields)
776
777
778 status_file = os.path.join(self.resultdir, 'status.log')
779 sys.stdout.write(msg)
780 open(status_file, "a").write(msg)
781 if subdir:
782 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000783 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000784 open(status_file, "a").write(msg)
785 self.__parse_status(msg.splitlines())
786
787
788 def __parse_status(self, new_lines):
789 if not self.using_parser:
790 return
791 new_tests = self.parser.process_lines(new_lines)
792 for test in new_tests:
793 self.__insert_test(test)
794
795
796 def __insert_test(self, test):
797 """ An internal method to insert a new test result into the
798 database. This method will not raise an exception, even if an
799 error occurs during the insert, to avoid failing a test
800 simply because of unexpected database issues."""
801 try:
802 self.results_db.insert_test(self.job_model, test)
803 except Exception:
804 msg = ("WARNING: An unexpected error occured while "
805 "inserting test results into the database. "
806 "Ignoring error.\n" + traceback.format_exc())
807 print >> sys.stderr, msg
808
mblighcaa62c22008-04-07 21:51:17 +0000809
jadmanskib6eb2f12008-09-12 16:39:36 +0000810
jadmanskia1f3c202008-09-15 19:17:16 +0000811class log_collector(object):
812 def __init__(self, host, client_tag, results_dir):
813 self.host = host
814 if not client_tag:
815 client_tag = "default"
816 self.client_results_dir = os.path.join(host.get_autodir(), "results",
817 client_tag)
818 self.server_results_dir = results_dir
819
820
jadmanskib6eb2f12008-09-12 16:39:36 +0000821 def collect_client_job_results(self):
822 """ A method that collects all the current results of a running
823 client job into the results dir. By default does nothing as no
824 client job is running, but when running a client job you can override
825 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000826
827 # make an effort to wait for the machine to come up
828 try:
829 self.host.wait_up(timeout=30)
830 except error.AutoservError:
831 # don't worry about any errors, we'll try and
832 # get the results anyway
833 pass
834
835
836 # Copy all dirs in default to results_dir
jadmanskibfb32f82008-10-03 23:13:36 +0000837 try:
838 keyval_path = self._prepare_for_copying_logs()
839 self.host.get_file(self.client_results_dir + '/',
840 self.server_results_dir)
841 self._process_copied_logs(keyval_path)
842 self._postprocess_copied_logs()
843 except Exception:
844 # well, don't stop running just because we couldn't get logs
845 print "Unexpected error copying test result logs, continuing ..."
846 traceback.print_exc(file=sys.stdout)
jadmanskia1f3c202008-09-15 19:17:16 +0000847
848
849 def _prepare_for_copying_logs(self):
850 server_keyval = os.path.join(self.server_results_dir, 'keyval')
851 if not os.path.exists(server_keyval):
852 # Client-side keyval file can be copied directly
853 return
854
855 # Copy client-side keyval to temporary location
856 suffix = '.keyval_%s' % self.host.hostname
857 fd, keyval_path = tempfile.mkstemp(suffix)
858 os.close(fd)
859 try:
860 client_keyval = os.path.join(self.client_results_dir, 'keyval')
861 try:
862 self.host.get_file(client_keyval, keyval_path)
863 finally:
864 # We will squirrel away the client side keyval
865 # away and move it back when we are done
866 remote_temp_dir = self.host.get_tmp_dir()
867 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
868 self.host.run('mv %s %s' % (client_keyval,
869 self.temp_keyval_path))
870 except (error.AutoservRunError, error.AutoservSSHTimeout):
871 print "Prepare for copying logs failed"
872 return keyval_path
873
874
875 def _process_copied_logs(self, keyval_path):
876 if not keyval_path:
877 # Client-side keyval file was copied directly
878 return
879
880 # Append contents of keyval_<host> file to keyval file
881 try:
882 # Read in new and old keyval files
883 new_keyval = utils.read_keyval(keyval_path)
884 old_keyval = utils.read_keyval(self.server_results_dir)
885 # 'Delete' from new keyval entries that are in both
886 tmp_keyval = {}
887 for key, val in new_keyval.iteritems():
888 if key not in old_keyval:
889 tmp_keyval[key] = val
890 # Append new info to keyval file
891 utils.write_keyval(self.server_results_dir, tmp_keyval)
892 # Delete keyval_<host> file
893 os.remove(keyval_path)
894 except IOError:
895 print "Process copied logs failed"
896
897
898 def _postprocess_copied_logs(self):
899 # we can now put our keyval file back
900 client_keyval = os.path.join(self.client_results_dir, 'keyval')
901 try:
902 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
903 except Exception:
904 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000905
906
mbligh57e78662008-06-17 19:53:49 +0000907# a file-like object for catching stderr from an autotest client and
908# extracting status logs from it
909class client_logger(object):
910 """Partial file object to write to both stdout and
911 the status log file. We only implement those methods
912 utils.run() actually calls.
913 """
jadmanskia1f3c202008-09-15 19:17:16 +0000914 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
915 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000916 extract_indent = re.compile(r"^(\t*).*$")
917
jadmanskia1f3c202008-09-15 19:17:16 +0000918 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000919 self.host = host
920 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000921 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000922 self.leftover = ""
923 self.last_line = ""
924 self.logs = {}
925
926
927 def _process_log_dict(self, log_dict):
928 log_list = log_dict.pop("logs", [])
929 for key in sorted(log_dict.iterkeys()):
930 log_list += self._process_log_dict(log_dict.pop(key))
931 return log_list
932
933
934 def _process_logs(self):
935 """Go through the accumulated logs in self.log and print them
936 out to stdout and the status log. Note that this processes
937 logs in an ordering where:
938
939 1) logs to different tags are never interleaved
940 2) logs to x.y come before logs to x.y.z for all z
941 3) logs to x.y come before x.z whenever y < z
942
943 Note that this will in general not be the same as the
944 chronological ordering of the logs. However, if a chronological
945 ordering is desired that one can be reconstructed from the
946 status log by looking at timestamp lines."""
947 log_list = self._process_log_dict(self.logs)
948 for line in log_list:
949 self.job._record_prerendered(line + '\n')
950 if log_list:
951 self.last_line = log_list[-1]
952
953
954 def _process_quoted_line(self, tag, line):
955 """Process a line quoted with an AUTOTEST_STATUS flag. If the
956 tag is blank then we want to push out all the data we've been
957 building up in self.logs, and then the newest line. If the
958 tag is not blank, then push the line into the logs for handling
959 later."""
960 print line
961 if tag == "":
962 self._process_logs()
963 self.job._record_prerendered(line + '\n')
964 self.last_line = line
965 else:
966 tag_parts = [int(x) for x in tag.split(".")]
967 log_dict = self.logs
968 for part in tag_parts:
969 log_dict = log_dict.setdefault(part, {})
970 log_list = log_dict.setdefault("logs", [])
971 log_list.append(line)
972
973
974 def _process_line(self, line):
975 """Write out a line of data to the appropriate stream. Status
976 lines sent by autotest will be prepended with
977 "AUTOTEST_STATUS", and all other lines are ssh error
978 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000979 status_match = self.status_parser.search(line)
980 test_complete_match = self.test_complete_parser.search(line)
981 if status_match:
982 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000983 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000984 elif test_complete_match:
985 fifo_path, = test_complete_match.groups()
986 self.log_collector.collect_client_job_results()
987 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000988 else:
989 print line
990
jadmanski4aeefe12008-06-20 20:04:25 +0000991
992 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000993 # use the indentation of whatever the last log line was
994 indent = self.extract_indent.match(last_line).group(1)
995 # if the last line starts a new group, add an extra indent
996 if last_line.lstrip('\t').startswith("START\t"):
997 indent += '\t'
998 return [self.job._render_record("WARN", None, None, msg,
999 timestamp, indent).rstrip('\n')
1000 for timestamp, msg in warnings]
1001
1002
1003 def _process_warnings(self, last_line, log_dict, warnings):
1004 if log_dict.keys() in ([], ["logs"]):
1005 # there are no sub-jobs, just append the warnings here
1006 warnings = self._format_warnings(last_line, warnings)
1007 log_list = log_dict.setdefault("logs", [])
1008 log_list += warnings
1009 for warning in warnings:
1010 sys.stdout.write(warning + '\n')
1011 else:
1012 # there are sub-jobs, so put the warnings in there
1013 log_list = log_dict.get("logs", [])
1014 if log_list:
1015 last_line = log_list[-1]
1016 for key in sorted(log_dict.iterkeys()):
1017 if key != "logs":
1018 self._process_warnings(last_line,
1019 log_dict[key],
1020 warnings)
1021
1022
1023 def write(self, data):
1024 # first check for any new console warnings
1025 warnings = self.job._read_warnings()
1026 self._process_warnings(self.last_line, self.logs, warnings)
1027 # now process the newest data written out
1028 data = self.leftover + data
1029 lines = data.split("\n")
1030 # process every line but the last one
1031 for line in lines[:-1]:
1032 self._process_line(line)
1033 # save the last line for later processing
1034 # since we may not have the whole line yet
1035 self.leftover = lines[-1]
1036
1037
1038 def flush(self):
1039 sys.stdout.flush()
1040
1041
1042 def close(self):
1043 if self.leftover:
1044 self._process_line(self.leftover)
1045 self._process_logs()
1046 self.flush()
1047
1048
mblighcaa62c22008-04-07 21:51:17 +00001049# site_server_job.py may be non-existant or empty, make sure that an
1050# appropriate site_server_job class is created nevertheless
1051try:
jadmanski0afbb632008-06-06 21:10:57 +00001052 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +00001053except ImportError:
jadmanski10646442008-08-13 14:05:21 +00001054 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +00001055 pass
1056
jadmanski10646442008-08-13 14:05:21 +00001057class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +00001058 pass