blob: 8b6c2c662fa9bddf99326d227c423b4b08c9dd20 [file] [log] [blame]
mblighf1c52842007-10-16 15:21:38 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
mblighdbdac6c2008-03-05 15:49:58 +000014import os, sys, re, time, select, subprocess, traceback
mblighf1c52842007-10-16 15:21:38 +000015
mbligh6437ff52008-04-17 15:24:38 +000016from autotest_lib.client.bin import fd_stack
mbligh302482e2008-05-01 20:06:16 +000017from autotest_lib.client.common_lib import error, logging
mbligh6437ff52008-04-17 15:24:38 +000018from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
jadmanski63aa3892008-06-06 16:38:28 +000020from autotest_lib.client.common_lib import utils
mbligh6437ff52008-04-17 15:24:38 +000021
mbligh3f4bced2007-11-05 17:55:53 +000022
mblighed5a4102007-11-20 00:46:41 +000023# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
jadmanski0afbb632008-06-06 21:10:57 +000026 server_dir = os.path.dirname(os.path.abspath(__file__))
27 script_file = os.path.join(server_dir, "control_segments", name)
28 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
mblighed5a4102007-11-20 00:46:41 +000032
33
mblighf1c52842007-10-16 15:21:38 +000034preamble = """\
35import os, sys
36
mblighccb9e182008-04-17 15:42:10 +000037from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
mbligh1965dfa2008-06-04 19:58:37 +000042from autotest_lib.server.utils import parse_machine
mblighccb9e182008-04-17 15:42:10 +000043from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
mblighf1c52842007-10-16 15:21:38 +000045
mbligh119c12a2007-11-12 22:13:44 +000046autotest.Autotest.job = job
mbligh31a49de2007-11-05 18:41:19 +000047hosts.SSHHost.job = job
mblighe1417fa2007-12-10 16:55:13 +000048barrier = barrier.barrier
mbligh1fb77cc2008-02-27 16:41:20 +000049if len(machines) > 1:
jadmanski0afbb632008-06-06 21:10:57 +000050 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
mblighf1c52842007-10-16 15:21:38 +000051"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
jadmanski0afbb632008-06-06 21:10:57 +000057 hostname, user, password, port = parse_machine(machine,
58 ssh_user, ssh_port, ssh_pass)
mbligh1965dfa2008-06-04 19:58:37 +000059
jadmanski0afbb632008-06-06 21:10:57 +000060 host = hosts.SSHHost(hostname, user, port, password=password)
61 at.run(control, host=host)
mblighf1c52842007-10-16 15:21:38 +000062
mbligh6437ff52008-04-17 15:24:38 +000063job.parallel_simple(run_client, machines)
mblighf1c52842007-10-16 15:21:38 +000064"""
65
mbligh303ccac2007-11-05 18:07:28 +000066crashdumps = """
67def crashdumps(machine):
jadmanski0afbb632008-06-06 21:10:57 +000068 hostname, user, password, port = parse_machine(machine,
69 ssh_user, ssh_port, ssh_pass)
mbligh1965dfa2008-06-04 19:58:37 +000070
jadmanski0afbb632008-06-06 21:10:57 +000071 host = hosts.SSHHost(hostname, user, port, initialize=False, \
72 password=password)
73 host.get_crashdumps(test_start_time)
mbligh303ccac2007-11-05 18:07:28 +000074
mbligh6437ff52008-04-17 15:24:38 +000075job.parallel_simple(crashdumps, machines, log=False)
mbligh303ccac2007-11-05 18:07:28 +000076"""
77
mbligh98ff1462007-12-19 16:27:55 +000078reboot_segment="""\
79def reboot(machine):
jadmanski0afbb632008-06-06 21:10:57 +000080 hostname, user, password, port = parse_machine(machine,
81 ssh_user, ssh_port, ssh_pass)
mbligh1965dfa2008-06-04 19:58:37 +000082
jadmanski0afbb632008-06-06 21:10:57 +000083 host = hosts.SSHHost(hostname, user, port, initialize=False, \
84 password=password)
85 host.reboot()
mblighf1c52842007-10-16 15:21:38 +000086
mbligh6437ff52008-04-17 15:24:38 +000087job.parallel_simple(reboot, machines, log=False)
mblighf1c52842007-10-16 15:21:38 +000088"""
89
mblighf36243d2007-10-30 15:36:16 +000090install="""\
91def install(machine):
jadmanski0afbb632008-06-06 21:10:57 +000092 hostname, user, password, port = parse_machine(machine,
93 ssh_user, ssh_port, ssh_pass)
mbligh1965dfa2008-06-04 19:58:37 +000094
jadmanski0afbb632008-06-06 21:10:57 +000095 host = hosts.SSHHost(hostname, user, port, initialize=False, \
96 password=password)
97 host.machine_install()
mblighf36243d2007-10-30 15:36:16 +000098
mbligh6437ff52008-04-17 15:24:38 +000099job.parallel_simple(install, machines, log=False)
mblighf36243d2007-10-30 15:36:16 +0000100"""
101
mbligh7f86e0b2007-11-24 19:45:07 +0000102# load up the verifier control segment, with an optional site-specific hook
mblighed5a4102007-11-20 00:46:41 +0000103verify = load_control_segment("site_verify")
104verify += load_control_segment("verify")
mbligh1d42d4e2007-11-05 22:42:00 +0000105
mbligh7f86e0b2007-11-24 19:45:07 +0000106# load up the repair control segment, with an optional site-specific hook
107repair = load_control_segment("site_repair")
108repair += load_control_segment("repair")
109
mbligh1d42d4e2007-11-05 22:42:00 +0000110
mbligh970b94e2008-01-24 16:29:34 +0000111# load up site-specific code for generating site-specific job data
112try:
jadmanski0afbb632008-06-06 21:10:57 +0000113 import site_job
114 get_site_job_data = site_job.get_site_job_data
115 del site_job
mbligh970b94e2008-01-24 16:29:34 +0000116except ImportError:
jadmanski0afbb632008-06-06 21:10:57 +0000117 # by default provide a stub that generates no site data
118 def get_site_job_data(job):
119 return {}
mbligh970b94e2008-01-24 16:29:34 +0000120
121
mblighcaa62c22008-04-07 21:51:17 +0000122class base_server_job:
jadmanski0afbb632008-06-06 21:10:57 +0000123 """The actual job against which we do everything.
mblighf1c52842007-10-16 15:21:38 +0000124
jadmanski0afbb632008-06-06 21:10:57 +0000125 Properties:
126 autodir
127 The top level autotest directory (/usr/local/autotest).
128 serverdir
129 <autodir>/server/
130 clientdir
131 <autodir>/client/
132 conmuxdir
133 <autodir>/conmux/
134 testdir
135 <autodir>/server/tests/
136 control
137 the control file for this job
138 """
mblighf1c52842007-10-16 15:21:38 +0000139
jadmanski0afbb632008-06-06 21:10:57 +0000140 STATUS_VERSION = 1
jadmanski6e8bf752008-05-14 00:17:48 +0000141
142
jadmanski0afbb632008-06-06 21:10:57 +0000143 def __init__(self, control, args, resultdir, label, user, machines,
144 client=False, parse_job="",
145 ssh_user='root', ssh_port=22, ssh_pass=''):
146 """
147 control
148 The control file (pathname of)
149 args
150 args to pass to the control file
151 resultdir
152 where to throw the results
153 label
154 label for the job
155 user
156 Username for the job (email address)
157 client
158 True if a client-side control file
159 """
160 path = os.path.dirname(__file__)
161 self.autodir = os.path.abspath(os.path.join(path, '..'))
162 self.serverdir = os.path.join(self.autodir, 'server')
163 self.testdir = os.path.join(self.serverdir, 'tests')
164 self.tmpdir = os.path.join(self.serverdir, 'tmp')
165 self.conmuxdir = os.path.join(self.autodir, 'conmux')
166 self.clientdir = os.path.join(self.autodir, 'client')
167 self.toolsdir = os.path.join(self.autodir, 'client/tools')
168 if control:
169 self.control = open(control, 'r').read()
170 self.control = re.sub('\r', '', self.control)
171 else:
172 self.control = None
173 self.resultdir = resultdir
174 if not os.path.exists(resultdir):
175 os.mkdir(resultdir)
176 self.debugdir = os.path.join(resultdir, 'debug')
177 if not os.path.exists(self.debugdir):
178 os.mkdir(self.debugdir)
179 self.status = os.path.join(resultdir, 'status')
180 self.label = label
181 self.user = user
182 self.args = args
183 self.machines = machines
184 self.client = client
185 self.record_prefix = ''
186 self.warning_loggers = set()
187 self.ssh_user = ssh_user
188 self.ssh_port = ssh_port
189 self.ssh_pass = ssh_pass
mblighf1c52842007-10-16 15:21:38 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 self.stdout = fd_stack.fd_stack(1, sys.stdout)
192 self.stderr = fd_stack.fd_stack(2, sys.stderr)
mbligh3f4bced2007-11-05 17:55:53 +0000193
jadmanski0afbb632008-06-06 21:10:57 +0000194 if os.path.exists(self.status):
195 os.unlink(self.status)
196 job_data = {'label' : label, 'user' : user,
197 'hostname' : ','.join(machines),
198 'status_version' : str(self.STATUS_VERSION)}
199 job_data.update(get_site_job_data(self))
200 utils.write_keyval(self.resultdir, job_data)
mblighf1c52842007-10-16 15:21:38 +0000201
jadmanski0afbb632008-06-06 21:10:57 +0000202 self.parse_job = parse_job
203 if self.parse_job and len(machines) == 1:
204 self.using_parser = True
205 self.init_parser(resultdir)
206 else:
207 self.using_parser = False
mbligh6437ff52008-04-17 15:24:38 +0000208
209
jadmanski0afbb632008-06-06 21:10:57 +0000210 def init_parser(self, resultdir):
211 """Start the continuous parsing of resultdir. This sets up
212 the database connection and inserts the basic job object into
213 the database if necessary."""
214 # redirect parser debugging to .parse.log
215 parse_log = os.path.join(resultdir, '.parse.log')
216 parse_log = open(parse_log, 'w', 0)
217 tko_utils.redirect_parser_debugging(parse_log)
218 # create a job model object and set up the db
219 self.results_db = tko_db.db(autocommit=True)
220 self.parser = status_lib.parser(self.STATUS_VERSION)
221 self.job_model = self.parser.make_job(resultdir)
222 self.parser.start(self.job_model)
223 # check if a job already exists in the db and insert it if
224 # it does not
225 job_idx = self.results_db.find_job(self.parse_job)
226 if job_idx is None:
227 self.results_db.insert_job(self.parse_job,
228 self.job_model)
229 else:
230 machine_idx = self.results_db.lookup_machine(
231 self.job_model.machine)
232 self.job_model.index = job_idx
233 self.job_model.machine_idx = machine_idx
mbligh6437ff52008-04-17 15:24:38 +0000234
235
jadmanski0afbb632008-06-06 21:10:57 +0000236 def cleanup_parser(self):
237 """This should be called after the server job is finished
238 to carry out any remaining cleanup (e.g. flushing any
239 remaining test results to the results db)"""
240 if not self.using_parser:
241 return
242 final_tests = self.parser.end()
243 for test in final_tests:
244 self.__insert_test(test)
245 self.using_parser = False
mbligh6437ff52008-04-17 15:24:38 +0000246
mblighf1c52842007-10-16 15:21:38 +0000247
jadmanski0afbb632008-06-06 21:10:57 +0000248 def verify(self):
249 if not self.machines:
250 raise error.AutoservError(
251 'No machines specified to verify')
252 try:
253 namespace = {'machines' : self.machines, 'job' : self, \
254 'ssh_user' : self.ssh_user, \
255 'ssh_port' : self.ssh_port, \
256 'ssh_pass' : self.ssh_pass}
257 exec(preamble + verify, namespace, namespace)
258 except Exception, e:
259 msg = ('Verify failed\n' + str(e) + '\n'
260 + traceback.format_exc())
261 self.record('ABORT', None, None, msg)
262 raise
mblighe25fd5b2008-01-22 17:23:37 +0000263
264
jadmanski0afbb632008-06-06 21:10:57 +0000265 def repair(self):
266 if not self.machines:
267 raise error.AutoservError(
268 'No machines specified to repair')
269 namespace = {'machines' : self.machines, 'job' : self, \
270 'ssh_user' : self.ssh_user, \
271 'ssh_port' : self.ssh_port, \
272 'ssh_pass' : self.ssh_pass}
273 # no matter what happens during repair, go on to try to reverify
274 try:
275 exec(preamble + repair, namespace, namespace)
276 except Exception, exc:
277 print 'Exception occured during repair'
278 traceback.print_exc()
279 self.verify()
mblighe25fd5b2008-01-22 17:23:37 +0000280
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 def enable_external_logging(self):
283 """Start or restart external logging mechanism.
284 """
285 pass
mblighcaa62c22008-04-07 21:51:17 +0000286
287
jadmanski0afbb632008-06-06 21:10:57 +0000288 def disable_external_logging(self):
289 """ Pause or stop external logging mechanism.
290 """
291 pass
mblighcaa62c22008-04-07 21:51:17 +0000292
293
jadmanski0afbb632008-06-06 21:10:57 +0000294 def use_external_logging(self):
295 """Return True if external logging should be used.
296 """
297 return False
mblighcaa62c22008-04-07 21:51:17 +0000298
299
jadmanski0afbb632008-06-06 21:10:57 +0000300 def parallel_simple(self, function, machines, log=True, timeout=None):
301 """Run 'function' using parallel_simple, with an extra
302 wrapper to handle the necessary setup for continuous parsing,
303 if possible. If continuous parsing is already properly
304 initialized then this should just work."""
305 is_forking = not (len(machines) == 1 and
306 self.machines == machines)
307 if self.parse_job and is_forking:
308 def wrapper(machine):
309 self.parse_job += "/" + machine
310 self.using_parser = True
311 self.machines = [machine]
312 self.resultdir = os.path.join(self.resultdir,
313 machine)
314 self.init_parser(self.resultdir)
315 result = function(machine)
316 self.cleanup_parser()
317 return result
318 else:
319 wrapper = function
320 subcommand.parallel_simple(wrapper, machines, log, timeout)
mbligh6437ff52008-04-17 15:24:38 +0000321
322
jadmanski0afbb632008-06-06 21:10:57 +0000323 def run(self, reboot = False, install_before = False,
324 install_after = False, collect_crashdumps = True,
325 namespace = {}):
326 # use a copy so changes don't affect the original dictionary
327 namespace = namespace.copy()
328 machines = self.machines
mbligh60dbd502007-10-26 14:59:31 +0000329
jadmanski0afbb632008-06-06 21:10:57 +0000330 self.aborted = False
331 namespace['machines'] = machines
332 namespace['args'] = self.args
333 namespace['job'] = self
334 namespace['ssh_user'] = self.ssh_user
335 namespace['ssh_port'] = self.ssh_port
336 namespace['ssh_pass'] = self.ssh_pass
337 test_start_time = int(time.time())
mblighf1c52842007-10-16 15:21:38 +0000338
jadmanski0afbb632008-06-06 21:10:57 +0000339 os.chdir(self.resultdir)
mblighf1c52842007-10-16 15:21:38 +0000340
jadmanski0afbb632008-06-06 21:10:57 +0000341 self.enable_external_logging()
342 status_log = os.path.join(self.resultdir, 'status.log')
343 try:
344 if install_before and machines:
345 exec(preamble + install, namespace, namespace)
346 if self.client:
347 namespace['control'] = self.control
348 open('control', 'w').write(self.control)
349 open('control.srv', 'w').write(client_wrapper)
350 server_control = client_wrapper
351 else:
352 open('control.srv', 'w').write(self.control)
353 server_control = self.control
354 exec(preamble + server_control, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000355
jadmanski0afbb632008-06-06 21:10:57 +0000356 finally:
357 if machines and collect_crashdumps:
358 namespace['test_start_time'] = test_start_time
359 exec(preamble + crashdumps,
360 namespace, namespace)
361 self.disable_external_logging()
362 if reboot and machines:
363 exec(preamble + reboot_segment,
364 namespace, namespace)
365 if install_after and machines:
366 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000367
mblighf1c52842007-10-16 15:21:38 +0000368
jadmanski0afbb632008-06-06 21:10:57 +0000369 def run_test(self, url, *args, **dargs):
370 """Summon a test object and run it.
mbligh43ac5222007-10-16 15:55:01 +0000371
jadmanski0afbb632008-06-06 21:10:57 +0000372 tag
373 tag to add to testname
374 url
375 url of the test to run
376 """
377
378 (group, testname) = test.testname(url)
379 tag = None
380 subdir = testname
381
382 if dargs.has_key('tag'):
383 tag = dargs['tag']
384 del dargs['tag']
385 if tag:
386 subdir += '.' + tag
mblighf1c52842007-10-16 15:21:38 +0000387
jadmanski0afbb632008-06-06 21:10:57 +0000388 outputdir = os.path.join(self.resultdir, subdir)
389 if os.path.exists(outputdir):
390 msg = ("%s already exists, test <%s> may have"
391 " already run with tag <%s>"
392 % (outputdir, testname, tag) )
393 raise error.TestError(msg)
394 os.mkdir(outputdir)
mblighd660afe2008-06-05 22:17:53 +0000395
jadmanski0afbb632008-06-06 21:10:57 +0000396 try:
397 test.runtest(self, url, tag, args, dargs)
398 self.record('GOOD', subdir, testname, 'completed successfully')
399 except error.TestNAError, detail:
400 self.record('TEST_NA', subdir, testname, str(detail))
401 except Exception, detail:
402 info = str(detail) + "\n" + traceback.format_exc()
403 self.record('FAIL', subdir, testname, info)
mblighf1c52842007-10-16 15:21:38 +0000404
405
jadmanski0afbb632008-06-06 21:10:57 +0000406 def run_group(self, function, *args, **dargs):
407 """\
408 function:
409 subroutine to run
410 *args:
411 arguments for the function
412 """
mblighf1c52842007-10-16 15:21:38 +0000413
jadmanski0afbb632008-06-06 21:10:57 +0000414 result = None
415 name = function.__name__
mblighf1c52842007-10-16 15:21:38 +0000416
jadmanski0afbb632008-06-06 21:10:57 +0000417 # Allow the tag for the group to be specified.
418 if dargs.has_key('tag'):
419 tag = dargs['tag']
420 del dargs['tag']
421 if tag:
422 name = tag
mblighf1c52842007-10-16 15:21:38 +0000423
jadmanski0afbb632008-06-06 21:10:57 +0000424 old_record_prefix = self.record_prefix
425 try:
426 try:
427 self.record('START', None, name)
428 self.record_prefix += '\t'
429 result = function(*args, **dargs)
430 except Exception, e:
431 self.record_prefix = old_record_prefix
432 err_msg = str(e) + '\n'
433 err_msg += traceback.format_exc()
434 self.record('END FAIL', None, name, err_msg)
435 else:
436 self.record_prefix = old_record_prefix
437 self.record('END GOOD', None, name)
mbligh302482e2008-05-01 20:06:16 +0000438
jadmanski0afbb632008-06-06 21:10:57 +0000439 # We don't want to raise up an error higher if it's just
440 # a TestError - we want to carry on to other tests. Hence
441 # this outer try/except block.
442 except error.TestError:
443 pass
444 except:
445 raise error.TestError(name + ' failed\n' +
446 traceback.format_exc())
mblighf1c52842007-10-16 15:21:38 +0000447
jadmanski0afbb632008-06-06 21:10:57 +0000448 return result
mblighf1c52842007-10-16 15:21:38 +0000449
450
jadmanski0afbb632008-06-06 21:10:57 +0000451 def run_reboot(self, reboot_func, get_kernel_func):
452 """\
453 A specialization of run_group meant specifically for handling
454 a reboot. Includes support for capturing the kernel version
455 after the reboot.
jadmanskif35bbb62008-05-29 21:36:04 +0000456
jadmanski0afbb632008-06-06 21:10:57 +0000457 reboot_func: a function that carries out the reboot
jadmanskif35bbb62008-05-29 21:36:04 +0000458
jadmanski0afbb632008-06-06 21:10:57 +0000459 get_kernel_func: a function that returns a string
460 representing the kernel version.
461 """
jadmanskif35bbb62008-05-29 21:36:04 +0000462
jadmanski0afbb632008-06-06 21:10:57 +0000463 old_record_prefix = self.record_prefix
464 try:
465 self.record('START', None, 'reboot')
466 self.record_prefix += '\t'
467 reboot_func()
468 except Exception, e:
469 self.record_prefix = old_record_prefix
470 err_msg = str(e) + '\n' + traceback.format_exc()
471 self.record('END FAIL', None, 'reboot', err_msg)
472 else:
473 kernel = get_kernel_func()
474 self.record_prefix = old_record_prefix
475 self.record('END GOOD', None, 'reboot',
476 optional_fields={"kernel": kernel})
jadmanskif35bbb62008-05-29 21:36:04 +0000477
478
jadmanski0afbb632008-06-06 21:10:57 +0000479 def record(self, status_code, subdir, operation, status='',
480 optional_fields=None):
481 """
482 Record job-level status
mblighf1c52842007-10-16 15:21:38 +0000483
jadmanski0afbb632008-06-06 21:10:57 +0000484 The intent is to make this file both machine parseable and
485 human readable. That involves a little more complexity, but
486 really isn't all that bad ;-)
mblighf1c52842007-10-16 15:21:38 +0000487
jadmanski0afbb632008-06-06 21:10:57 +0000488 Format is <status code>\t<subdir>\t<operation>\t<status>
mblighf1c52842007-10-16 15:21:38 +0000489
jadmanski0afbb632008-06-06 21:10:57 +0000490 status code: see common_lib.logging.is_valid_status()
491 for valid status definition
mblighf1c52842007-10-16 15:21:38 +0000492
jadmanski0afbb632008-06-06 21:10:57 +0000493 subdir: MUST be a relevant subdirectory in the results,
494 or None, which will be represented as '----'
mblighf1c52842007-10-16 15:21:38 +0000495
jadmanski0afbb632008-06-06 21:10:57 +0000496 operation: description of what you ran (e.g. "dbench", or
497 "mkfs -t foobar /dev/sda9")
mblighf1c52842007-10-16 15:21:38 +0000498
jadmanski0afbb632008-06-06 21:10:57 +0000499 status: error message or "completed sucessfully"
mblighf1c52842007-10-16 15:21:38 +0000500
jadmanski0afbb632008-06-06 21:10:57 +0000501 ------------------------------------------------------------
mblighf1c52842007-10-16 15:21:38 +0000502
jadmanski0afbb632008-06-06 21:10:57 +0000503 Initial tabs indicate indent levels for grouping, and is
504 governed by self.record_prefix
mblighf1c52842007-10-16 15:21:38 +0000505
jadmanski0afbb632008-06-06 21:10:57 +0000506 multiline messages have secondary lines prefaced by a double
507 space (' ')
mblighf4e04152008-02-21 16:05:53 +0000508
jadmanski0afbb632008-06-06 21:10:57 +0000509 Executing this method will trigger the logging of all new
510 warnings to date from the various console loggers.
511 """
512 # poll all our warning loggers for new warnings
513 warnings = self._read_warnings()
514 for timestamp, msg in warnings:
515 self.__record("WARN", None, None, msg, timestamp)
mblighdab39662008-02-27 16:47:55 +0000516
jadmanski0afbb632008-06-06 21:10:57 +0000517 # write out the actual status log line
518 self.__record(status_code, subdir, operation, status,
519 optional_fields=optional_fields)
mblighdab39662008-02-27 16:47:55 +0000520
521
jadmanski0afbb632008-06-06 21:10:57 +0000522 def _read_warnings(self):
523 warnings = []
524 while True:
525 # pull in a line of output from every logger that has
526 # output ready to be read
527 loggers, _, _ = select.select(self.warning_loggers,
528 [], [], 0)
529 closed_loggers = set()
530 for logger in loggers:
531 line = logger.readline()
532 # record any broken pipes (aka line == empty)
533 if len(line) == 0:
534 closed_loggers.add(logger)
535 continue
536 timestamp, msg = line.split('\t', 1)
537 warnings.append((int(timestamp), msg.strip()))
mblighf4e04152008-02-21 16:05:53 +0000538
jadmanski0afbb632008-06-06 21:10:57 +0000539 # stop listening to loggers that are closed
540 self.warning_loggers -= closed_loggers
mblighf4e04152008-02-21 16:05:53 +0000541
jadmanski0afbb632008-06-06 21:10:57 +0000542 # stop if none of the loggers have any output left
543 if not loggers:
544 break
mblighf4e04152008-02-21 16:05:53 +0000545
jadmanski0afbb632008-06-06 21:10:57 +0000546 # sort into timestamp order
547 warnings.sort()
548 return warnings
mblighf4e04152008-02-21 16:05:53 +0000549
550
jadmanski0afbb632008-06-06 21:10:57 +0000551 def _render_record(self, status_code, subdir, operation, status='',
552 epoch_time=None, record_prefix=None,
553 optional_fields=None):
554 """
555 Internal Function to generate a record to be written into a
556 status log. For use by server_job.* classes only.
557 """
558 if subdir:
559 if re.match(r'[\n\t]', subdir):
560 raise ValueError(
561 'Invalid character in subdir string')
562 substr = subdir
563 else:
564 substr = '----'
mbligh6437ff52008-04-17 15:24:38 +0000565
jadmanski0afbb632008-06-06 21:10:57 +0000566 if not logging.is_valid_status(status_code):
567 raise ValueError('Invalid status code supplied: %s' %
568 status_code)
569 if not operation:
570 operation = '----'
571 if re.match(r'[\n\t]', operation):
572 raise ValueError(
573 'Invalid character in operation string')
574 operation = operation.rstrip()
575 status = status.rstrip()
576 status = re.sub(r"\t", " ", status)
577 # Ensure any continuation lines are marked so we can
578 # detect them in the status file to ensure it is parsable.
579 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
mblighf1c52842007-10-16 15:21:38 +0000580
jadmanski0afbb632008-06-06 21:10:57 +0000581 if not optional_fields:
582 optional_fields = {}
jadmanskif35bbb62008-05-29 21:36:04 +0000583
jadmanski0afbb632008-06-06 21:10:57 +0000584 # Generate timestamps for inclusion in the logs
585 if epoch_time is None:
586 epoch_time = int(time.time())
587 local_time = time.localtime(epoch_time)
588 optional_fields["timestamp"] = str(epoch_time)
589 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
590 local_time)
jadmanskif35bbb62008-05-29 21:36:04 +0000591
jadmanski0afbb632008-06-06 21:10:57 +0000592 fields = [status_code, substr, operation]
593 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
594 fields.append(status)
mbligh30270302007-11-05 20:33:52 +0000595
jadmanski0afbb632008-06-06 21:10:57 +0000596 if record_prefix is None:
597 record_prefix = self.record_prefix
mblighdab39662008-02-27 16:47:55 +0000598
jadmanski0afbb632008-06-06 21:10:57 +0000599 msg = '\t'.join(str(x) for x in fields)
jadmanskif35bbb62008-05-29 21:36:04 +0000600
jadmanski0afbb632008-06-06 21:10:57 +0000601 return record_prefix + msg + '\n'
mblighdab39662008-02-27 16:47:55 +0000602
603
jadmanski0afbb632008-06-06 21:10:57 +0000604 def _record_prerendered(self, msg):
605 """
606 Record a pre-rendered msg into the status logs. The only
607 change this makes to the message is to add on the local
608 indentation. Should not be called outside of server_job.*
609 classes. Unlike __record, this does not write the message
610 to standard output.
611 """
612 lines = []
613 status_file = os.path.join(self.resultdir, 'status.log')
614 status_log = open(status_file, 'a')
615 for line in msg.splitlines():
616 line = self.record_prefix + line + '\n'
617 lines.append(line)
618 status_log.write(line)
619 status_log.close()
620 self.__parse_status(lines)
mblighdab39662008-02-27 16:47:55 +0000621
622
jadmanski0afbb632008-06-06 21:10:57 +0000623 def __record(self, status_code, subdir, operation, status='',
624 epoch_time=None, optional_fields=None):
625 """
626 Actual function for recording a single line into the status
627 logs. Should never be called directly, only by job.record as
628 this would bypass the console monitor logging.
629 """
mblighdab39662008-02-27 16:47:55 +0000630
jadmanski0afbb632008-06-06 21:10:57 +0000631 msg = self._render_record(status_code, subdir, operation,
632 status, epoch_time,
633 optional_fields=optional_fields)
mblighdab39662008-02-27 16:47:55 +0000634
mblighf1c52842007-10-16 15:21:38 +0000635
jadmanski0afbb632008-06-06 21:10:57 +0000636 status_file = os.path.join(self.resultdir, 'status.log')
637 sys.stdout.write(msg)
638 open(status_file, "a").write(msg)
639 if subdir:
640 test_dir = os.path.join(self.resultdir, subdir)
641 status_file = os.path.join(test_dir, 'status')
642 open(status_file, "a").write(msg)
643 self.__parse_status(msg.splitlines())
mblighb03ba642008-03-13 17:37:17 +0000644
645
jadmanski0afbb632008-06-06 21:10:57 +0000646 def __parse_status(self, new_lines):
647 if not self.using_parser:
648 return
649 new_tests = self.parser.process_lines(new_lines)
650 for test in new_tests:
651 self.__insert_test(test)
jadmanski28816c22008-05-21 18:18:05 +0000652
653
jadmanski0afbb632008-06-06 21:10:57 +0000654 def __insert_test(self, test):
655 """ An internal method to insert a new test result into the
656 database. This method will not raise an exception, even if an
657 error occurs during the insert, to avoid failing a test
658 simply because of unexpected database issues."""
659 try:
660 self.results_db.insert_test(self.job_model, test)
661 except Exception:
662 msg = ("WARNING: An unexpected error occured while "
663 "inserting test results into the database. "
664 "Ignoring error.\n" + traceback.format_exc())
665 print >> sys.stderr, msg
mblighdab39662008-02-27 16:47:55 +0000666
667
668# a file-like object for catching stderr from an autotest client and
669# extracting status logs from it
670class client_logger(object):
jadmanski0afbb632008-06-06 21:10:57 +0000671 """Partial file object to write to both stdout and
672 the status log file. We only implement those methods
673 utils.run() actually calls.
674 """
675 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
676 extract_indent = re.compile(r"^(\t*).*$")
mblighdab39662008-02-27 16:47:55 +0000677
jadmanski0afbb632008-06-06 21:10:57 +0000678 def __init__(self, job):
679 self.job = job
680 self.leftover = ""
681 self.last_line = ""
682 self.logs = {}
mblighdab39662008-02-27 16:47:55 +0000683
684
jadmanski0afbb632008-06-06 21:10:57 +0000685 def _process_log_dict(self, log_dict):
686 log_list = log_dict.pop("logs", [])
687 for key in sorted(log_dict.iterkeys()):
688 log_list += self._process_log_dict(log_dict.pop(key))
689 return log_list
mblighdab39662008-02-27 16:47:55 +0000690
691
jadmanski0afbb632008-06-06 21:10:57 +0000692 def _process_logs(self):
693 """Go through the accumulated logs in self.log and print them
694 out to stdout and the status log. Note that this processes
695 logs in an ordering where:
mblighdab39662008-02-27 16:47:55 +0000696
jadmanski0afbb632008-06-06 21:10:57 +0000697 1) logs to different tags are never interleaved
698 2) logs to x.y come before logs to x.y.z for all z
699 3) logs to x.y come before x.z whenever y < z
mblighdab39662008-02-27 16:47:55 +0000700
jadmanski0afbb632008-06-06 21:10:57 +0000701 Note that this will in general not be the same as the
702 chronological ordering of the logs. However, if a chronological
703 ordering is desired that one can be reconstructed from the
704 status log by looking at timestamp lines."""
705 log_list = self._process_log_dict(self.logs)
706 for line in log_list:
707 self.job._record_prerendered(line + '\n')
708 if log_list:
709 self.last_line = log_list[-1]
mblighdab39662008-02-27 16:47:55 +0000710
711
jadmanski0afbb632008-06-06 21:10:57 +0000712 def _process_quoted_line(self, tag, line):
713 """Process a line quoted with an AUTOTEST_STATUS flag. If the
714 tag is blank then we want to push out all the data we've been
715 building up in self.logs, and then the newest line. If the
716 tag is not blank, then push the line into the logs for handling
717 later."""
718 print line
719 if tag == "":
720 self._process_logs()
721 self.job._record_prerendered(line + '\n')
722 self.last_line = line
723 else:
724 tag_parts = [int(x) for x in tag.split(".")]
725 log_dict = self.logs
726 for part in tag_parts:
727 log_dict = log_dict.setdefault(part, {})
728 log_list = log_dict.setdefault("logs", [])
729 log_list.append(line)
mblighdab39662008-02-27 16:47:55 +0000730
731
jadmanski0afbb632008-06-06 21:10:57 +0000732 def _process_line(self, line):
733 """Write out a line of data to the appropriate stream. Status
734 lines sent by autotest will be prepended with
735 "AUTOTEST_STATUS", and all other lines are ssh error
736 messages."""
737 match = self.parser.search(line)
738 if match:
739 tag, line = match.groups()
740 self._process_quoted_line(tag, line)
741 else:
742 print line
mblighdab39662008-02-27 16:47:55 +0000743
744
jadmanski0afbb632008-06-06 21:10:57 +0000745 def _format_warnings(self, last_line, warnings):
746 # use the indentation of whatever the last log line was
747 indent = self.extract_indent.match(last_line).group(1)
748 # if the last line starts a new group, add an extra indent
749 if last_line.lstrip('\t').startswith("START\t"):
750 indent += '\t'
751 return [self.job._render_record("WARN", None, None, msg,
752 timestamp, indent).rstrip('\n')
753 for timestamp, msg in warnings]
mblighdab39662008-02-27 16:47:55 +0000754
755
jadmanski0afbb632008-06-06 21:10:57 +0000756 def _process_warnings(self, last_line, log_dict, warnings):
757 if log_dict.keys() in ([], ["logs"]):
758 # there are no sub-jobs, just append the warnings here
759 warnings = self._format_warnings(last_line, warnings)
760 log_list = log_dict.setdefault("logs", [])
761 log_list += warnings
762 for warning in warnings:
763 sys.stdout.write(warning + '\n')
764 else:
765 # there are sub-jobs, so put the warnings in there
766 log_list = log_dict.get("logs", [])
767 if log_list:
768 last_line = log_list[-1]
769 for key in sorted(log_dict.iterkeys()):
770 if key != "logs":
771 self._process_warnings(last_line,
772 log_dict[key],
773 warnings)
mblighdab39662008-02-27 16:47:55 +0000774
775
jadmanski0afbb632008-06-06 21:10:57 +0000776 def write(self, data):
777 # first check for any new console warnings
778 warnings = self.job._read_warnings()
779 self._process_warnings(self.last_line, self.logs, warnings)
780 # now process the newest data written out
781 data = self.leftover + data
782 lines = data.split("\n")
783 # process every line but the last one
784 for line in lines[:-1]:
785 self._process_line(line)
786 # save the last line for later processing
787 # since we may not have the whole line yet
788 self.leftover = lines[-1]
mblighdab39662008-02-27 16:47:55 +0000789
790
jadmanski0afbb632008-06-06 21:10:57 +0000791 def flush(self):
792 sys.stdout.flush()
mblighdab39662008-02-27 16:47:55 +0000793
794
jadmanski0afbb632008-06-06 21:10:57 +0000795 def close(self):
796 if self.leftover:
797 self._process_line(self.leftover)
798 self._process_logs()
799 self.flush()
mblighcaa62c22008-04-07 21:51:17 +0000800
801# site_server_job.py may be non-existant or empty, make sure that an
802# appropriate site_server_job class is created nevertheless
803try:
jadmanski0afbb632008-06-06 21:10:57 +0000804 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000805except ImportError:
jadmanski0afbb632008-06-06 21:10:57 +0000806 class site_server_job(base_server_job):
807 pass
808
mblighcaa62c22008-04-07 21:51:17 +0000809class server_job(site_server_job):
jadmanski0afbb632008-06-06 21:10:57 +0000810 pass