blob: c37c96c944d86fee52c3de7ce283772e3586f5f7 [file] [log] [blame]
mblighf1c52842007-10-16 15:21:38 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
mblighdbdac6c2008-03-05 15:49:58 +000014import os, sys, re, time, select, subprocess, traceback
mbligh03f4fc72007-11-29 20:56:14 +000015import test
mblighf1c52842007-10-16 15:21:38 +000016from utils import *
mblighf31b0c02007-11-29 18:19:22 +000017from common.error import *
mblighf1c52842007-10-16 15:21:38 +000018
mbligh3f4bced2007-11-05 17:55:53 +000019# this magic incantation should give us access to a client library
20server_dir = os.path.dirname(__file__)
21client_dir = os.path.join(server_dir, "..", "client", "bin")
22sys.path.append(client_dir)
23import fd_stack
24sys.path.pop()
25
mblighed5a4102007-11-20 00:46:41 +000026# load up a control segment
27# these are all stored in <server_dir>/control_segments
28def load_control_segment(name):
29 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh7f86e0b2007-11-24 19:45:07 +000030 script_file = os.path.join(server_dir, "control_segments", name)
mblighed5a4102007-11-20 00:46:41 +000031 if os.path.exists(script_file):
32 return file(script_file).read()
33 else:
34 return ""
35
36
mblighf1c52842007-10-16 15:21:38 +000037preamble = """\
38import os, sys
39
mblighb3c9f372008-01-14 16:39:44 +000040import hosts, autotest, kvm, git, standalone_profiler
mblighd0868ab2007-12-04 22:47:46 +000041import source_kernel, rpm_kernel, deb_kernel, git_kernel
mbligh03f4fc72007-11-29 20:56:14 +000042from common.error import *
mblighe1417fa2007-12-10 16:55:13 +000043from common import barrier
mblighf1c52842007-10-16 15:21:38 +000044from subcommand import *
45from utils import run, get_tmp_dir, sh_escape
46
mbligh119c12a2007-11-12 22:13:44 +000047autotest.Autotest.job = job
mbligh31a49de2007-11-05 18:41:19 +000048hosts.SSHHost.job = job
mblighe1417fa2007-12-10 16:55:13 +000049barrier = barrier.barrier
mbligh1fb77cc2008-02-27 16:41:20 +000050
51if len(machines) > 1:
52 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
mblighf1c52842007-10-16 15:21:38 +000053"""
54
55client_wrapper = """
56at = autotest.Autotest()
57
58def run_client(machine):
59 host = hosts.SSHHost(machine)
60 at.run(control, host=host)
61
mbligh1fb77cc2008-02-27 16:41:20 +000062parallel_simple(run_client, machines)
mblighf1c52842007-10-16 15:21:38 +000063"""
64
mbligh303ccac2007-11-05 18:07:28 +000065crashdumps = """
66def crashdumps(machine):
67 host = hosts.SSHHost(machine, initialize=False)
68 host.get_crashdumps(test_start_time)
69
70parallel_simple(crashdumps, machines, log=False)
71"""
72
mbligh98ff1462007-12-19 16:27:55 +000073reboot_segment="""\
74def reboot(machine):
mbligh17f0c662007-11-05 18:28:19 +000075 host = hosts.SSHHost(machine, initialize=False)
76 host.reboot()
mblighf1c52842007-10-16 15:21:38 +000077
mbligh98ff1462007-12-19 16:27:55 +000078parallel_simple(reboot, machines, log=False)
mblighf1c52842007-10-16 15:21:38 +000079"""
80
mblighf36243d2007-10-30 15:36:16 +000081install="""\
82def install(machine):
mbligh17f0c662007-11-05 18:28:19 +000083 host = hosts.SSHHost(machine, initialize=False)
84 host.machine_install()
mblighf36243d2007-10-30 15:36:16 +000085
mbligh009b25a2007-11-05 18:38:51 +000086parallel_simple(install, machines, log=False)
mblighf36243d2007-10-30 15:36:16 +000087"""
88
mbligh7f86e0b2007-11-24 19:45:07 +000089# load up the verifier control segment, with an optional site-specific hook
mblighed5a4102007-11-20 00:46:41 +000090verify = load_control_segment("site_verify")
91verify += load_control_segment("verify")
mbligh1d42d4e2007-11-05 22:42:00 +000092
mbligh7f86e0b2007-11-24 19:45:07 +000093# load up the repair control segment, with an optional site-specific hook
94repair = load_control_segment("site_repair")
95repair += load_control_segment("repair")
96
mbligh1d42d4e2007-11-05 22:42:00 +000097
mbligh970b94e2008-01-24 16:29:34 +000098# load up site-specific code for generating site-specific job data
99try:
100 import site_job
101 get_site_job_data = site_job.get_site_job_data
102 del site_job
103except ImportError:
104 # by default provide a stub that generates no site data
105 def get_site_job_data(job):
106 return {}
107
108
mblighf1c52842007-10-16 15:21:38 +0000109class server_job:
110 """The actual job against which we do everything.
111
112 Properties:
113 autodir
114 The top level autotest directory (/usr/local/autotest).
115 serverdir
116 <autodir>/server/
117 clientdir
118 <autodir>/client/
119 conmuxdir
120 <autodir>/conmux/
121 testdir
122 <autodir>/server/tests/
123 control
124 the control file for this job
125 """
126
mblighe8b37a92007-12-19 15:54:11 +0000127 def __init__(self, control, args, resultdir, label, user, machines,
128 client = False):
mblighf1c52842007-10-16 15:21:38 +0000129 """
130 control
131 The control file (pathname of)
132 args
133 args to pass to the control file
134 resultdir
135 where to throw the results
mbligh18420c22007-10-16 22:27:14 +0000136 label
137 label for the job
mblighf1c52842007-10-16 15:21:38 +0000138 user
139 Username for the job (email address)
140 client
141 True if a client-side control file
142 """
mbligh05269362007-10-16 16:58:11 +0000143 path = os.path.dirname(sys.modules['server_job'].__file__)
mblighf1c52842007-10-16 15:21:38 +0000144 self.autodir = os.path.abspath(os.path.join(path, '..'))
145 self.serverdir = os.path.join(self.autodir, 'server')
mbligh05269362007-10-16 16:58:11 +0000146 self.testdir = os.path.join(self.serverdir, 'tests')
147 self.tmpdir = os.path.join(self.serverdir, 'tmp')
mblighf1c52842007-10-16 15:21:38 +0000148 self.conmuxdir = os.path.join(self.autodir, 'conmux')
149 self.clientdir = os.path.join(self.autodir, 'client')
mblighe25fd5b2008-01-22 17:23:37 +0000150 if control:
151 self.control = open(control, 'r').read()
152 self.control = re.sub('\r', '', self.control)
153 else:
154 self.control = None
mblighf1c52842007-10-16 15:21:38 +0000155 self.resultdir = resultdir
156 if not os.path.exists(resultdir):
157 os.mkdir(resultdir)
mbligh3ccb8592007-11-05 18:13:40 +0000158 self.debugdir = os.path.join(resultdir, 'debug')
159 if not os.path.exists(self.debugdir):
160 os.mkdir(self.debugdir)
mbligh3dcf2c92007-10-16 22:24:00 +0000161 self.status = os.path.join(resultdir, 'status')
mbligh18420c22007-10-16 22:27:14 +0000162 self.label = label
mblighf1c52842007-10-16 15:21:38 +0000163 self.user = user
164 self.args = args
mblighe8b37a92007-12-19 15:54:11 +0000165 self.machines = machines
mblighf1c52842007-10-16 15:21:38 +0000166 self.client = client
167 self.record_prefix = ''
mblighf4e04152008-02-21 16:05:53 +0000168 self.warning_loggers = set()
mblighf1c52842007-10-16 15:21:38 +0000169
mbligh3f4bced2007-11-05 17:55:53 +0000170 self.stdout = fd_stack.fd_stack(1, sys.stdout)
171 self.stderr = fd_stack.fd_stack(2, sys.stderr)
172
mbligh3dcf2c92007-10-16 22:24:00 +0000173 if os.path.exists(self.status):
174 os.unlink(self.status)
mblighe8b37a92007-12-19 15:54:11 +0000175 job_data = { 'label' : label, 'user' : user,
176 'hostname' : ','.join(machines) }
mbligh970b94e2008-01-24 16:29:34 +0000177 job_data.update(get_site_job_data(self))
mblighf1c52842007-10-16 15:21:38 +0000178 write_keyval(self.resultdir, job_data)
179
180
mblighe25fd5b2008-01-22 17:23:37 +0000181 def verify(self):
182 if not self.machines:
183 raise AutoservError('No machines specified to verify')
184 try:
185 namespace = {'machines' : self.machines, 'job' : self}
186 exec(preamble + verify, namespace, namespace)
187 except Exception, e:
188 msg = 'Verify failed\n' + str(e) + '\n' + format_error()
189 self.record('ABORT', None, None, msg)
190 raise
191
192
193 def repair(self):
194 if not self.machines:
195 raise AutoservError('No machines specified to repair')
196 namespace = {'machines' : self.machines, 'job' : self}
mbligh16c722d2008-03-05 00:58:44 +0000197 # no matter what happens during repair, go on to try to reverify
198 try:
199 exec(preamble + repair, namespace, namespace)
200 except Exception, exc:
201 print 'Exception occured during repair'
202 traceback.print_exc()
mbligh8141f862008-01-25 17:20:40 +0000203 self.verify()
mblighe25fd5b2008-01-22 17:23:37 +0000204
205
mblighe8b37a92007-12-19 15:54:11 +0000206 def run(self, reboot = False, install_before = False,
mblighddd54332008-03-07 18:14:06 +0000207 install_after = False, collect_crashdumps = True,
208 namespace = {}):
mbligh60dbd502007-10-26 14:59:31 +0000209 # use a copy so changes don't affect the original dictionary
210 namespace = namespace.copy()
mblighe8b37a92007-12-19 15:54:11 +0000211 machines = self.machines
mbligh60dbd502007-10-26 14:59:31 +0000212
mblighfaf0cd42007-11-19 16:00:24 +0000213 self.aborted = False
mblighf1c52842007-10-16 15:21:38 +0000214 namespace['machines'] = machines
215 namespace['args'] = self.args
216 namespace['job'] = self
mbligh6e294382007-11-05 18:11:29 +0000217 test_start_time = int(time.time())
mblighf1c52842007-10-16 15:21:38 +0000218
mbligh87c5d882007-10-29 17:07:24 +0000219 os.chdir(self.resultdir)
220
221 status_log = os.path.join(self.resultdir, 'status.log')
mblighf1c52842007-10-16 15:21:38 +0000222 try:
mblighf36243d2007-10-30 15:36:16 +0000223 if install_before and machines:
224 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000225 if self.client:
226 namespace['control'] = self.control
227 open('control', 'w').write(self.control)
228 open('control.srv', 'w').write(client_wrapper)
229 server_control = client_wrapper
230 else:
231 open('control.srv', 'w').write(self.control)
232 server_control = self.control
mblighf1c52842007-10-16 15:21:38 +0000233 exec(preamble + server_control, namespace, namespace)
234
235 finally:
mblighddd54332008-03-07 18:14:06 +0000236 if machines and collect_crashdumps:
mbligh6e294382007-11-05 18:11:29 +0000237 namespace['test_start_time'] = test_start_time
mbligh98ff1462007-12-19 16:27:55 +0000238 exec(preamble + crashdumps,
239 namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000240 if reboot and machines:
mbligh98ff1462007-12-19 16:27:55 +0000241 exec(preamble + reboot_segment,
242 namespace, namespace)
mblighf36243d2007-10-30 15:36:16 +0000243 if install_after and machines:
244 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000245
246
247 def run_test(self, url, *args, **dargs):
248 """Summon a test object and run it.
249
250 tag
251 tag to add to testname
252 url
253 url of the test to run
254 """
255
mblighf1c52842007-10-16 15:21:38 +0000256 (group, testname) = test.testname(url)
257 tag = None
258 subdir = testname
mbligh43ac5222007-10-16 15:55:01 +0000259
mblighf1c52842007-10-16 15:21:38 +0000260 if dargs.has_key('tag'):
261 tag = dargs['tag']
262 del dargs['tag']
263 if tag:
264 subdir += '.' + tag
mblighf1c52842007-10-16 15:21:38 +0000265
mbligh43ac5222007-10-16 15:55:01 +0000266 try:
267 test.runtest(self, url, tag, args, dargs)
268 self.record('GOOD', subdir, testname, 'completed successfully')
269 except Exception, detail:
mbligh4e61c4e2008-04-07 18:36:59 +0000270 self.record('FAIL', subdir, testname, str(detail) + "\n" + format_error())
mblighf1c52842007-10-16 15:21:38 +0000271
272
273 def run_group(self, function, *args, **dargs):
274 """\
275 function:
276 subroutine to run
277 *args:
278 arguments for the function
279 """
280
281 result = None
282 name = function.__name__
283
284 # Allow the tag for the group to be specified.
285 if dargs.has_key('tag'):
286 tag = dargs['tag']
287 del dargs['tag']
288 if tag:
289 name = tag
290
291 # if tag:
292 # name += '.' + tag
293 old_record_prefix = self.record_prefix
294 try:
295 try:
296 self.record('START', None, name)
297 self.record_prefix += '\t'
298 result = function(*args, **dargs)
299 self.record_prefix = old_record_prefix
300 self.record('END GOOD', None, name)
301 except:
302 self.record_prefix = old_record_prefix
303 self.record('END FAIL', None, name, format_error())
304 # We don't want to raise up an error higher if it's just
305 # a TestError - we want to carry on to other tests. Hence
306 # this outer try/except block.
307 except TestError:
308 pass
309 except:
310 raise TestError(name + ' failed\n' + format_error())
311
312 return result
313
314
mblighf4e04152008-02-21 16:05:53 +0000315 def record(self, status_code, subdir, operation, status=''):
mblighf1c52842007-10-16 15:21:38 +0000316 """
317 Record job-level status
318
319 The intent is to make this file both machine parseable and
320 human readable. That involves a little more complexity, but
321 really isn't all that bad ;-)
322
323 Format is <status code>\t<subdir>\t<operation>\t<status>
324
325 status code: (GOOD|WARN|FAIL|ABORT)
326 or START
327 or END (GOOD|WARN|FAIL|ABORT)
328
329 subdir: MUST be a relevant subdirectory in the results,
330 or None, which will be represented as '----'
331
332 operation: description of what you ran (e.g. "dbench", or
333 "mkfs -t foobar /dev/sda9")
334
335 status: error message or "completed sucessfully"
336
337 ------------------------------------------------------------
338
339 Initial tabs indicate indent levels for grouping, and is
340 governed by self.record_prefix
341
342 multiline messages have secondary lines prefaced by a double
343 space (' ')
mblighf4e04152008-02-21 16:05:53 +0000344
345 Executing this method will trigger the logging of all new
346 warnings to date from the various console loggers.
347 """
mblighdab39662008-02-27 16:47:55 +0000348 # poll all our warning loggers for new warnings
349 warnings = self._read_warnings()
350 for timestamp, msg in warnings:
351 self.__record("WARN", None, None, msg, timestamp)
352
353 # write out the actual status log line
354 self.__record(status_code, subdir, operation, status)
355
356
357 def _read_warnings(self):
mblighf4e04152008-02-21 16:05:53 +0000358 warnings = []
359 while True:
360 # pull in a line of output from every logger that has
361 # output ready to be read
362 loggers, _, _ = select.select(self.warning_loggers,
363 [], [], 0)
364 closed_loggers = set()
365 for logger in loggers:
366 line = logger.readline()
367 # record any broken pipes (aka line == empty)
368 if len(line) == 0:
369 closed_loggers.add(logger)
370 continue
371 timestamp, msg = line.split('\t', 1)
372 warnings.append((int(timestamp), msg.strip()))
373
374 # stop listening to loggers that are closed
375 self.warning_loggers -= closed_loggers
376
377 # stop if none of the loggers have any output left
378 if not loggers:
379 break
380
mblighdab39662008-02-27 16:47:55 +0000381 # sort into timestamp order
382 warnings.sort()
383 return warnings
mblighf4e04152008-02-21 16:05:53 +0000384
385
mblighdab39662008-02-27 16:47:55 +0000386 def _render_record(self, status_code, subdir, operation, status='',
387 epoch_time=None, record_prefix=None):
mblighf4e04152008-02-21 16:05:53 +0000388 """
mblighdab39662008-02-27 16:47:55 +0000389 Internal Function to generate a record to be written into a
390 status log. For use by server_job.* classes only.
mblighf1c52842007-10-16 15:21:38 +0000391 """
mblighf1c52842007-10-16 15:21:38 +0000392 if subdir:
393 if re.match(r'[\n\t]', subdir):
mbligh4d6feff2008-01-14 16:48:56 +0000394 raise ValueError('Invalid character in subdir string')
mblighf1c52842007-10-16 15:21:38 +0000395 substr = subdir
396 else:
397 substr = '----'
398
399 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
400 status_code):
mbligh4d6feff2008-01-14 16:48:56 +0000401 raise ValueError('Invalid status code supplied: %s' % status_code)
mblighe25fd5b2008-01-22 17:23:37 +0000402 if not operation:
403 operation = '----'
mblighf1c52842007-10-16 15:21:38 +0000404 if re.match(r'[\n\t]', operation):
mbligh4d6feff2008-01-14 16:48:56 +0000405 raise ValueError('Invalid character in operation string')
mblighf1c52842007-10-16 15:21:38 +0000406 operation = operation.rstrip()
407 status = status.rstrip()
408 status = re.sub(r"\t", " ", status)
409 # Ensure any continuation lines are marked so we can
410 # detect them in the status file to ensure it is parsable.
411 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
412
mbligh30270302007-11-05 20:33:52 +0000413 # Generate timestamps for inclusion in the logs
mblighf4e04152008-02-21 16:05:53 +0000414 if epoch_time is None:
415 epoch_time = int(time.time())
mbligh30270302007-11-05 20:33:52 +0000416 local_time = time.localtime(epoch_time)
417 epoch_time_str = "timestamp=%d" % (epoch_time,)
418 local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
419 local_time)
420
mblighdab39662008-02-27 16:47:55 +0000421 if record_prefix is None:
422 record_prefix = self.record_prefix
423
mbligh30270302007-11-05 20:33:52 +0000424 msg = '\t'.join(str(x) for x in (status_code, substr, operation,
425 epoch_time_str, local_time_str,
426 status))
mblighdab39662008-02-27 16:47:55 +0000427 return record_prefix + msg + '\n'
428
429
430 def _record_prerendered(self, msg):
431 """
432 Record a pre-rendered msg into the status logs. The only
433 change this makes to the message is to add on the local
434 indentation. Should not be called outside of server_job.*
435 classes. Unlike __record, this does not write the message
436 to standard output.
437 """
438 status_file = os.path.join(self.resultdir, 'status.log')
439 status_log = open(status_file, 'a')
mblighb03ba642008-03-13 17:37:17 +0000440 need_reparse = False
mblighdab39662008-02-27 16:47:55 +0000441 for line in msg.splitlines():
442 line = self.record_prefix + line + '\n'
443 status_log.write(line)
mblighb03ba642008-03-13 17:37:17 +0000444 if self.__need_reparse(line):
445 need_reparse = True
mblighdab39662008-02-27 16:47:55 +0000446 status_log.close()
mblighb03ba642008-03-13 17:37:17 +0000447 if need_reparse:
448 self.__parse_status()
mblighdab39662008-02-27 16:47:55 +0000449
450
451 def __record(self, status_code, subdir, operation, status='',
452 epoch_time=None):
453 """
454 Actual function for recording a single line into the status
455 logs. Should never be called directly, only by job.record as
456 this would bypass the console monitor logging.
457 """
458
459 msg = self._render_record(status_code, subdir, operation,
460 status, epoch_time)
461
mblighf1c52842007-10-16 15:21:38 +0000462
mbligh31a49de2007-11-05 18:41:19 +0000463 status_file = os.path.join(self.resultdir, 'status.log')
mblighdab39662008-02-27 16:47:55 +0000464 sys.stdout.write(msg)
465 open(status_file, "a").write(msg)
mblighf1c52842007-10-16 15:21:38 +0000466 if subdir:
mblighd56eb592008-01-22 16:36:34 +0000467 test_dir = os.path.join(self.resultdir, subdir)
468 if not os.path.exists(test_dir):
469 os.mkdir(test_dir)
470 status_file = os.path.join(test_dir, 'status')
mblighdab39662008-02-27 16:47:55 +0000471 open(status_file, "a").write(msg)
mblighb03ba642008-03-13 17:37:17 +0000472 if self.__need_reparse(msg):
473 self.__parse_status()
474
475
476 def __need_reparse(self, line):
477 # the parser will not record results if lines have more than
478 # one level of indentation
479 indent = len(re.search(r"^(\t*)", line).group(1))
480 if indent > 1:
481 return False
482 # we can also skip START lines, as they add nothing
483 line = line.lstrip("\t")
484 if line.startswith("START\t"):
485 return False
486 # otherwise, we should do a parse
487 return True
mblighdbdac6c2008-03-05 15:49:58 +0000488
489
490 def __parse_status(self):
491 """
492 If a .parse.cmd file is present in the results directory,
493 launch the tko parser.
494 """
495 cmdfile = os.path.join(self.resultdir, '.parse.cmd')
496 if os.path.exists(cmdfile):
497 cmd = open(cmdfile).read().strip()
498 subprocess.Popen(cmd, shell=True)
mblighdab39662008-02-27 16:47:55 +0000499
500
501# a file-like object for catching stderr from an autotest client and
502# extracting status logs from it
503class client_logger(object):
504 """Partial file object to write to both stdout and
505 the status log file. We only implement those methods
506 utils.run() actually calls.
507 """
508 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
509 extract_indent = re.compile(r"^(\t*).*$")
510
511 def __init__(self, job):
512 self.job = job
513 self.leftover = ""
514 self.last_line = ""
515 self.logs = {}
516
517
518 def _process_log_dict(self, log_dict):
519 log_list = log_dict.pop("logs", [])
520 for key in sorted(log_dict.iterkeys()):
521 log_list += self._process_log_dict(log_dict.pop(key))
522 return log_list
523
524
525 def _process_logs(self):
526 """Go through the accumulated logs in self.log and print them
527 out to stdout and the status log. Note that this processes
528 logs in an ordering where:
529
530 1) logs to different tags are never interleaved
531 2) logs to x.y come before logs to x.y.z for all z
532 3) logs to x.y come before x.z whenever y < z
533
534 Note that this will in general not be the same as the
535 chronological ordering of the logs. However, if a chronological
536 ordering is desired that one can be reconstructed from the
537 status log by looking at timestamp lines."""
538 log_list = self._process_log_dict(self.logs)
539 for line in log_list:
540 self.job._record_prerendered(line + '\n')
541 if log_list:
542 self.last_line = log_list[-1]
543
544
545 def _process_quoted_line(self, tag, line):
546 """Process a line quoted with an AUTOTEST_STATUS flag. If the
547 tag is blank then we want to push out all the data we've been
548 building up in self.logs, and then the newest line. If the
549 tag is not blank, then push the line into the logs for handling
550 later."""
551 print line
552 if tag == "":
553 self._process_logs()
554 self.job._record_prerendered(line + '\n')
555 self.last_line = line
556 else:
557 tag_parts = [int(x) for x in tag.split(".")]
558 log_dict = self.logs
559 for part in tag_parts:
560 log_dict = log_dict.setdefault(part, {})
561 log_list = log_dict.setdefault("logs", [])
562 log_list.append(line)
563
564
565 def _process_line(self, line):
566 """Write out a line of data to the appropriate stream. Status
567 lines sent by autotest will be prepended with
568 "AUTOTEST_STATUS", and all other lines are ssh error
569 messages."""
570 match = self.parser.search(line)
571 if match:
572 tag, line = match.groups()
573 self._process_quoted_line(tag, line)
574 else:
mblighfe749d22008-03-07 18:14:46 +0000575 print line
mblighdab39662008-02-27 16:47:55 +0000576
577
578 def _format_warnings(self, last_line, warnings):
mbligh71d340d2008-03-05 15:51:16 +0000579 # use the indentation of whatever the last log line was
mblighdab39662008-02-27 16:47:55 +0000580 indent = self.extract_indent.match(last_line).group(1)
mbligh71d340d2008-03-05 15:51:16 +0000581 # if the last line starts a new group, add an extra indent
582 if last_line.lstrip('\t').startswith("START\t"):
583 indent += '\t'
mblighdab39662008-02-27 16:47:55 +0000584 return [self.job._render_record("WARN", None, None, msg,
585 timestamp, indent).rstrip('\n')
586 for timestamp, msg in warnings]
587
588
589 def _process_warnings(self, last_line, log_dict, warnings):
590 if log_dict.keys() in ([], ["logs"]):
591 # there are no sub-jobs, just append the warnings here
592 warnings = self._format_warnings(last_line, warnings)
593 log_list = log_dict.setdefault("logs", [])
594 log_list += warnings
595 for warning in warnings:
596 sys.stdout.write(warning + '\n')
597 else:
598 # there are sub-jobs, so put the warnings in there
599 log_list = log_dict.get("logs", [])
600 if log_list:
601 last_line = log_list[-1]
602 for key in sorted(log_dict.iterkeys()):
603 if key != "logs":
604 self._process_warnings(last_line,
605 log_dict[key],
606 warnings)
607
608
609 def write(self, data):
610 # first check for any new console warnings
611 warnings = self.job._read_warnings()
612 self._process_warnings(self.last_line, self.logs, warnings)
613 # now process the newest data written out
614 data = self.leftover + data
615 lines = data.split("\n")
616 # process every line but the last one
617 for line in lines[:-1]:
618 self._process_line(line)
619 # save the last line for later processing
620 # since we may not have the whole line yet
621 self.leftover = lines[-1]
622
623
624 def flush(self):
625 sys.stdout.flush()
mblighdab39662008-02-27 16:47:55 +0000626
627
628 def close(self):
629 if self.leftover:
630 self._process_line(self.leftover)
631 self._process_logs()
632 self.flush()