blob: a32869b988b4261daece04c945f9a850e308b68c [file] [log] [blame]
mblighf1c52842007-10-16 15:21:38 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
mblighdbdac6c2008-03-05 15:49:58 +000014import os, sys, re, time, select, subprocess, traceback
mbligh03f4fc72007-11-29 20:56:14 +000015import test
mblighf1c52842007-10-16 15:21:38 +000016from utils import *
mblighf31b0c02007-11-29 18:19:22 +000017from common.error import *
mblighf1c52842007-10-16 15:21:38 +000018
mbligh3f4bced2007-11-05 17:55:53 +000019# this magic incantation should give us access to a client library
20server_dir = os.path.dirname(__file__)
21client_dir = os.path.join(server_dir, "..", "client", "bin")
22sys.path.append(client_dir)
23import fd_stack
24sys.path.pop()
25
mblighed5a4102007-11-20 00:46:41 +000026# load up a control segment
27# these are all stored in <server_dir>/control_segments
28def load_control_segment(name):
29 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh7f86e0b2007-11-24 19:45:07 +000030 script_file = os.path.join(server_dir, "control_segments", name)
mblighed5a4102007-11-20 00:46:41 +000031 if os.path.exists(script_file):
32 return file(script_file).read()
33 else:
34 return ""
35
36
mblighf1c52842007-10-16 15:21:38 +000037preamble = """\
38import os, sys
39
mblighb3c9f372008-01-14 16:39:44 +000040import hosts, autotest, kvm, git, standalone_profiler
mblighd0868ab2007-12-04 22:47:46 +000041import source_kernel, rpm_kernel, deb_kernel, git_kernel
mbligh03f4fc72007-11-29 20:56:14 +000042from common.error import *
mblighe1417fa2007-12-10 16:55:13 +000043from common import barrier
mblighf1c52842007-10-16 15:21:38 +000044from subcommand import *
45from utils import run, get_tmp_dir, sh_escape
46
mbligh119c12a2007-11-12 22:13:44 +000047autotest.Autotest.job = job
mbligh31a49de2007-11-05 18:41:19 +000048hosts.SSHHost.job = job
mblighe1417fa2007-12-10 16:55:13 +000049barrier = barrier.barrier
mbligh1fb77cc2008-02-27 16:41:20 +000050
51if len(machines) > 1:
52 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
mblighf1c52842007-10-16 15:21:38 +000053"""
54
55client_wrapper = """
56at = autotest.Autotest()
57
58def run_client(machine):
59 host = hosts.SSHHost(machine)
60 at.run(control, host=host)
61
mbligh1fb77cc2008-02-27 16:41:20 +000062parallel_simple(run_client, machines)
mblighf1c52842007-10-16 15:21:38 +000063"""
64
mbligh303ccac2007-11-05 18:07:28 +000065crashdumps = """
66def crashdumps(machine):
67 host = hosts.SSHHost(machine, initialize=False)
68 host.get_crashdumps(test_start_time)
69
70parallel_simple(crashdumps, machines, log=False)
71"""
72
mbligh98ff1462007-12-19 16:27:55 +000073reboot_segment="""\
74def reboot(machine):
mbligh17f0c662007-11-05 18:28:19 +000075 host = hosts.SSHHost(machine, initialize=False)
76 host.reboot()
mblighf1c52842007-10-16 15:21:38 +000077
mbligh98ff1462007-12-19 16:27:55 +000078parallel_simple(reboot, machines, log=False)
mblighf1c52842007-10-16 15:21:38 +000079"""
80
mblighf36243d2007-10-30 15:36:16 +000081install="""\
82def install(machine):
mbligh17f0c662007-11-05 18:28:19 +000083 host = hosts.SSHHost(machine, initialize=False)
84 host.machine_install()
mblighf36243d2007-10-30 15:36:16 +000085
mbligh009b25a2007-11-05 18:38:51 +000086parallel_simple(install, machines, log=False)
mblighf36243d2007-10-30 15:36:16 +000087"""
88
mbligh7f86e0b2007-11-24 19:45:07 +000089# load up the verifier control segment, with an optional site-specific hook
mblighed5a4102007-11-20 00:46:41 +000090verify = load_control_segment("site_verify")
91verify += load_control_segment("verify")
mbligh1d42d4e2007-11-05 22:42:00 +000092
mbligh7f86e0b2007-11-24 19:45:07 +000093# load up the repair control segment, with an optional site-specific hook
94repair = load_control_segment("site_repair")
95repair += load_control_segment("repair")
96
mbligh1d42d4e2007-11-05 22:42:00 +000097
mbligh970b94e2008-01-24 16:29:34 +000098# load up site-specific code for generating site-specific job data
99try:
100 import site_job
101 get_site_job_data = site_job.get_site_job_data
102 del site_job
103except ImportError:
104 # by default provide a stub that generates no site data
105 def get_site_job_data(job):
106 return {}
107
108
mblighf1c52842007-10-16 15:21:38 +0000109class server_job:
110 """The actual job against which we do everything.
111
112 Properties:
113 autodir
114 The top level autotest directory (/usr/local/autotest).
115 serverdir
116 <autodir>/server/
117 clientdir
118 <autodir>/client/
119 conmuxdir
120 <autodir>/conmux/
121 testdir
122 <autodir>/server/tests/
123 control
124 the control file for this job
125 """
126
mblighe8b37a92007-12-19 15:54:11 +0000127 def __init__(self, control, args, resultdir, label, user, machines,
128 client = False):
mblighf1c52842007-10-16 15:21:38 +0000129 """
130 control
131 The control file (pathname of)
132 args
133 args to pass to the control file
134 resultdir
135 where to throw the results
mbligh18420c22007-10-16 22:27:14 +0000136 label
137 label for the job
mblighf1c52842007-10-16 15:21:38 +0000138 user
139 Username for the job (email address)
140 client
141 True if a client-side control file
142 """
mbligh05269362007-10-16 16:58:11 +0000143 path = os.path.dirname(sys.modules['server_job'].__file__)
mblighf1c52842007-10-16 15:21:38 +0000144 self.autodir = os.path.abspath(os.path.join(path, '..'))
145 self.serverdir = os.path.join(self.autodir, 'server')
mbligh05269362007-10-16 16:58:11 +0000146 self.testdir = os.path.join(self.serverdir, 'tests')
147 self.tmpdir = os.path.join(self.serverdir, 'tmp')
mblighf1c52842007-10-16 15:21:38 +0000148 self.conmuxdir = os.path.join(self.autodir, 'conmux')
149 self.clientdir = os.path.join(self.autodir, 'client')
mblighe25fd5b2008-01-22 17:23:37 +0000150 if control:
151 self.control = open(control, 'r').read()
152 self.control = re.sub('\r', '', self.control)
153 else:
154 self.control = None
mblighf1c52842007-10-16 15:21:38 +0000155 self.resultdir = resultdir
156 if not os.path.exists(resultdir):
157 os.mkdir(resultdir)
mbligh3ccb8592007-11-05 18:13:40 +0000158 self.debugdir = os.path.join(resultdir, 'debug')
159 if not os.path.exists(self.debugdir):
160 os.mkdir(self.debugdir)
mbligh3dcf2c92007-10-16 22:24:00 +0000161 self.status = os.path.join(resultdir, 'status')
mbligh18420c22007-10-16 22:27:14 +0000162 self.label = label
mblighf1c52842007-10-16 15:21:38 +0000163 self.user = user
164 self.args = args
mblighe8b37a92007-12-19 15:54:11 +0000165 self.machines = machines
mblighf1c52842007-10-16 15:21:38 +0000166 self.client = client
167 self.record_prefix = ''
mblighf4e04152008-02-21 16:05:53 +0000168 self.warning_loggers = set()
mblighf1c52842007-10-16 15:21:38 +0000169
mbligh3f4bced2007-11-05 17:55:53 +0000170 self.stdout = fd_stack.fd_stack(1, sys.stdout)
171 self.stderr = fd_stack.fd_stack(2, sys.stderr)
172
mbligh3dcf2c92007-10-16 22:24:00 +0000173 if os.path.exists(self.status):
174 os.unlink(self.status)
mblighe8b37a92007-12-19 15:54:11 +0000175 job_data = { 'label' : label, 'user' : user,
176 'hostname' : ','.join(machines) }
mbligh970b94e2008-01-24 16:29:34 +0000177 job_data.update(get_site_job_data(self))
mblighf1c52842007-10-16 15:21:38 +0000178 write_keyval(self.resultdir, job_data)
179
180
mblighe25fd5b2008-01-22 17:23:37 +0000181 def verify(self):
182 if not self.machines:
183 raise AutoservError('No machines specified to verify')
184 try:
185 namespace = {'machines' : self.machines, 'job' : self}
186 exec(preamble + verify, namespace, namespace)
187 except Exception, e:
188 msg = 'Verify failed\n' + str(e) + '\n' + format_error()
189 self.record('ABORT', None, None, msg)
190 raise
191
192
193 def repair(self):
194 if not self.machines:
195 raise AutoservError('No machines specified to repair')
196 namespace = {'machines' : self.machines, 'job' : self}
mbligh16c722d2008-03-05 00:58:44 +0000197 # no matter what happens during repair, go on to try to reverify
198 try:
199 exec(preamble + repair, namespace, namespace)
200 except Exception, exc:
201 print 'Exception occured during repair'
202 traceback.print_exc()
mbligh8141f862008-01-25 17:20:40 +0000203 self.verify()
mblighe25fd5b2008-01-22 17:23:37 +0000204
205
mblighe8b37a92007-12-19 15:54:11 +0000206 def run(self, reboot = False, install_before = False,
mblighf36243d2007-10-30 15:36:16 +0000207 install_after = False, namespace = {}):
mbligh60dbd502007-10-26 14:59:31 +0000208 # use a copy so changes don't affect the original dictionary
209 namespace = namespace.copy()
mblighe8b37a92007-12-19 15:54:11 +0000210 machines = self.machines
mbligh60dbd502007-10-26 14:59:31 +0000211
mblighfaf0cd42007-11-19 16:00:24 +0000212 self.aborted = False
mblighf1c52842007-10-16 15:21:38 +0000213 namespace['machines'] = machines
214 namespace['args'] = self.args
215 namespace['job'] = self
mbligh6e294382007-11-05 18:11:29 +0000216 test_start_time = int(time.time())
mblighf1c52842007-10-16 15:21:38 +0000217
mbligh87c5d882007-10-29 17:07:24 +0000218 os.chdir(self.resultdir)
219
220 status_log = os.path.join(self.resultdir, 'status.log')
mblighf1c52842007-10-16 15:21:38 +0000221 try:
mblighf36243d2007-10-30 15:36:16 +0000222 if install_before and machines:
223 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000224 if self.client:
225 namespace['control'] = self.control
226 open('control', 'w').write(self.control)
227 open('control.srv', 'w').write(client_wrapper)
228 server_control = client_wrapper
229 else:
230 open('control.srv', 'w').write(self.control)
231 server_control = self.control
mblighf1c52842007-10-16 15:21:38 +0000232 exec(preamble + server_control, namespace, namespace)
233
234 finally:
mbligh6e294382007-11-05 18:11:29 +0000235 if machines:
236 namespace['test_start_time'] = test_start_time
mbligh98ff1462007-12-19 16:27:55 +0000237 exec(preamble + crashdumps,
238 namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000239 if reboot and machines:
mbligh98ff1462007-12-19 16:27:55 +0000240 exec(preamble + reboot_segment,
241 namespace, namespace)
mblighf36243d2007-10-30 15:36:16 +0000242 if install_after and machines:
243 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000244
245
246 def run_test(self, url, *args, **dargs):
247 """Summon a test object and run it.
248
249 tag
250 tag to add to testname
251 url
252 url of the test to run
253 """
254
mblighf1c52842007-10-16 15:21:38 +0000255 (group, testname) = test.testname(url)
256 tag = None
257 subdir = testname
mbligh43ac5222007-10-16 15:55:01 +0000258
mblighf1c52842007-10-16 15:21:38 +0000259 if dargs.has_key('tag'):
260 tag = dargs['tag']
261 del dargs['tag']
262 if tag:
263 subdir += '.' + tag
mblighf1c52842007-10-16 15:21:38 +0000264
mbligh43ac5222007-10-16 15:55:01 +0000265 try:
266 test.runtest(self, url, tag, args, dargs)
267 self.record('GOOD', subdir, testname, 'completed successfully')
268 except Exception, detail:
mbligh05269362007-10-16 16:58:11 +0000269 self.record('FAIL', subdir, testname, format_error())
mblighf1c52842007-10-16 15:21:38 +0000270
271
272 def run_group(self, function, *args, **dargs):
273 """\
274 function:
275 subroutine to run
276 *args:
277 arguments for the function
278 """
279
280 result = None
281 name = function.__name__
282
283 # Allow the tag for the group to be specified.
284 if dargs.has_key('tag'):
285 tag = dargs['tag']
286 del dargs['tag']
287 if tag:
288 name = tag
289
290 # if tag:
291 # name += '.' + tag
292 old_record_prefix = self.record_prefix
293 try:
294 try:
295 self.record('START', None, name)
296 self.record_prefix += '\t'
297 result = function(*args, **dargs)
298 self.record_prefix = old_record_prefix
299 self.record('END GOOD', None, name)
300 except:
301 self.record_prefix = old_record_prefix
302 self.record('END FAIL', None, name, format_error())
303 # We don't want to raise up an error higher if it's just
304 # a TestError - we want to carry on to other tests. Hence
305 # this outer try/except block.
306 except TestError:
307 pass
308 except:
309 raise TestError(name + ' failed\n' + format_error())
310
311 return result
312
313
mblighf4e04152008-02-21 16:05:53 +0000314 def record(self, status_code, subdir, operation, status=''):
mblighf1c52842007-10-16 15:21:38 +0000315 """
316 Record job-level status
317
318 The intent is to make this file both machine parseable and
319 human readable. That involves a little more complexity, but
320 really isn't all that bad ;-)
321
322 Format is <status code>\t<subdir>\t<operation>\t<status>
323
324 status code: (GOOD|WARN|FAIL|ABORT)
325 or START
326 or END (GOOD|WARN|FAIL|ABORT)
327
328 subdir: MUST be a relevant subdirectory in the results,
329 or None, which will be represented as '----'
330
331 operation: description of what you ran (e.g. "dbench", or
332 "mkfs -t foobar /dev/sda9")
333
334 status: error message or "completed sucessfully"
335
336 ------------------------------------------------------------
337
338 Initial tabs indicate indent levels for grouping, and is
339 governed by self.record_prefix
340
341 multiline messages have secondary lines prefaced by a double
342 space (' ')
mblighf4e04152008-02-21 16:05:53 +0000343
344 Executing this method will trigger the logging of all new
345 warnings to date from the various console loggers.
346 """
mblighdab39662008-02-27 16:47:55 +0000347 # poll all our warning loggers for new warnings
348 warnings = self._read_warnings()
349 for timestamp, msg in warnings:
350 self.__record("WARN", None, None, msg, timestamp)
351
352 # write out the actual status log line
353 self.__record(status_code, subdir, operation, status)
354
355
356 def _read_warnings(self):
mblighf4e04152008-02-21 16:05:53 +0000357 warnings = []
358 while True:
359 # pull in a line of output from every logger that has
360 # output ready to be read
361 loggers, _, _ = select.select(self.warning_loggers,
362 [], [], 0)
363 closed_loggers = set()
364 for logger in loggers:
365 line = logger.readline()
366 # record any broken pipes (aka line == empty)
367 if len(line) == 0:
368 closed_loggers.add(logger)
369 continue
370 timestamp, msg = line.split('\t', 1)
371 warnings.append((int(timestamp), msg.strip()))
372
373 # stop listening to loggers that are closed
374 self.warning_loggers -= closed_loggers
375
376 # stop if none of the loggers have any output left
377 if not loggers:
378 break
379
mblighdab39662008-02-27 16:47:55 +0000380 # sort into timestamp order
381 warnings.sort()
382 return warnings
mblighf4e04152008-02-21 16:05:53 +0000383
384
mblighdab39662008-02-27 16:47:55 +0000385 def _render_record(self, status_code, subdir, operation, status='',
386 epoch_time=None, record_prefix=None):
mblighf4e04152008-02-21 16:05:53 +0000387 """
mblighdab39662008-02-27 16:47:55 +0000388 Internal Function to generate a record to be written into a
389 status log. For use by server_job.* classes only.
mblighf1c52842007-10-16 15:21:38 +0000390 """
mblighf1c52842007-10-16 15:21:38 +0000391 if subdir:
392 if re.match(r'[\n\t]', subdir):
mbligh4d6feff2008-01-14 16:48:56 +0000393 raise ValueError('Invalid character in subdir string')
mblighf1c52842007-10-16 15:21:38 +0000394 substr = subdir
395 else:
396 substr = '----'
397
398 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
399 status_code):
mbligh4d6feff2008-01-14 16:48:56 +0000400 raise ValueError('Invalid status code supplied: %s' % status_code)
mblighe25fd5b2008-01-22 17:23:37 +0000401 if not operation:
402 operation = '----'
mblighf1c52842007-10-16 15:21:38 +0000403 if re.match(r'[\n\t]', operation):
mbligh4d6feff2008-01-14 16:48:56 +0000404 raise ValueError('Invalid character in operation string')
mblighf1c52842007-10-16 15:21:38 +0000405 operation = operation.rstrip()
406 status = status.rstrip()
407 status = re.sub(r"\t", " ", status)
408 # Ensure any continuation lines are marked so we can
409 # detect them in the status file to ensure it is parsable.
410 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
411
mbligh30270302007-11-05 20:33:52 +0000412 # Generate timestamps for inclusion in the logs
mblighf4e04152008-02-21 16:05:53 +0000413 if epoch_time is None:
414 epoch_time = int(time.time())
mbligh30270302007-11-05 20:33:52 +0000415 local_time = time.localtime(epoch_time)
416 epoch_time_str = "timestamp=%d" % (epoch_time,)
417 local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
418 local_time)
419
mblighdab39662008-02-27 16:47:55 +0000420 if record_prefix is None:
421 record_prefix = self.record_prefix
422
mbligh30270302007-11-05 20:33:52 +0000423 msg = '\t'.join(str(x) for x in (status_code, substr, operation,
424 epoch_time_str, local_time_str,
425 status))
mblighdab39662008-02-27 16:47:55 +0000426 return record_prefix + msg + '\n'
427
428
429 def _record_prerendered(self, msg):
430 """
431 Record a pre-rendered msg into the status logs. The only
432 change this makes to the message is to add on the local
433 indentation. Should not be called outside of server_job.*
434 classes. Unlike __record, this does not write the message
435 to standard output.
436 """
437 status_file = os.path.join(self.resultdir, 'status.log')
438 status_log = open(status_file, 'a')
439 for line in msg.splitlines():
440 line = self.record_prefix + line + '\n'
441 status_log.write(line)
442 status_log.close()
mblighdbdac6c2008-03-05 15:49:58 +0000443 self.__parse_status()
mblighdab39662008-02-27 16:47:55 +0000444
445
446 def __record(self, status_code, subdir, operation, status='',
447 epoch_time=None):
448 """
449 Actual function for recording a single line into the status
450 logs. Should never be called directly, only by job.record as
451 this would bypass the console monitor logging.
452 """
453
454 msg = self._render_record(status_code, subdir, operation,
455 status, epoch_time)
456
mblighf1c52842007-10-16 15:21:38 +0000457
mbligh31a49de2007-11-05 18:41:19 +0000458 status_file = os.path.join(self.resultdir, 'status.log')
mblighdab39662008-02-27 16:47:55 +0000459 sys.stdout.write(msg)
460 open(status_file, "a").write(msg)
mblighf1c52842007-10-16 15:21:38 +0000461 if subdir:
mblighd56eb592008-01-22 16:36:34 +0000462 test_dir = os.path.join(self.resultdir, subdir)
463 if not os.path.exists(test_dir):
464 os.mkdir(test_dir)
465 status_file = os.path.join(test_dir, 'status')
mblighdab39662008-02-27 16:47:55 +0000466 open(status_file, "a").write(msg)
mblighdbdac6c2008-03-05 15:49:58 +0000467 self.__parse_status()
468
469
470 def __parse_status(self):
471 """
472 If a .parse.cmd file is present in the results directory,
473 launch the tko parser.
474 """
475 cmdfile = os.path.join(self.resultdir, '.parse.cmd')
476 if os.path.exists(cmdfile):
477 cmd = open(cmdfile).read().strip()
478 subprocess.Popen(cmd, shell=True)
mblighdab39662008-02-27 16:47:55 +0000479
480
481# a file-like object for catching stderr from an autotest client and
482# extracting status logs from it
483class client_logger(object):
484 """Partial file object to write to both stdout and
485 the status log file. We only implement those methods
486 utils.run() actually calls.
487 """
488 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
489 extract_indent = re.compile(r"^(\t*).*$")
490
491 def __init__(self, job):
492 self.job = job
493 self.leftover = ""
494 self.last_line = ""
495 self.logs = {}
496
497
498 def _process_log_dict(self, log_dict):
499 log_list = log_dict.pop("logs", [])
500 for key in sorted(log_dict.iterkeys()):
501 log_list += self._process_log_dict(log_dict.pop(key))
502 return log_list
503
504
505 def _process_logs(self):
506 """Go through the accumulated logs in self.log and print them
507 out to stdout and the status log. Note that this processes
508 logs in an ordering where:
509
510 1) logs to different tags are never interleaved
511 2) logs to x.y come before logs to x.y.z for all z
512 3) logs to x.y come before x.z whenever y < z
513
514 Note that this will in general not be the same as the
515 chronological ordering of the logs. However, if a chronological
516 ordering is desired that one can be reconstructed from the
517 status log by looking at timestamp lines."""
518 log_list = self._process_log_dict(self.logs)
519 for line in log_list:
520 self.job._record_prerendered(line + '\n')
521 if log_list:
522 self.last_line = log_list[-1]
523
524
525 def _process_quoted_line(self, tag, line):
526 """Process a line quoted with an AUTOTEST_STATUS flag. If the
527 tag is blank then we want to push out all the data we've been
528 building up in self.logs, and then the newest line. If the
529 tag is not blank, then push the line into the logs for handling
530 later."""
531 print line
532 if tag == "":
533 self._process_logs()
534 self.job._record_prerendered(line + '\n')
535 self.last_line = line
536 else:
537 tag_parts = [int(x) for x in tag.split(".")]
538 log_dict = self.logs
539 for part in tag_parts:
540 log_dict = log_dict.setdefault(part, {})
541 log_list = log_dict.setdefault("logs", [])
542 log_list.append(line)
543
544
545 def _process_line(self, line):
546 """Write out a line of data to the appropriate stream. Status
547 lines sent by autotest will be prepended with
548 "AUTOTEST_STATUS", and all other lines are ssh error
549 messages."""
550 match = self.parser.search(line)
551 if match:
552 tag, line = match.groups()
553 self._process_quoted_line(tag, line)
554 else:
555 print >> sys.stderr, line
556
557
558 def _format_warnings(self, last_line, warnings):
mbligh71d340d2008-03-05 15:51:16 +0000559 # use the indentation of whatever the last log line was
mblighdab39662008-02-27 16:47:55 +0000560 indent = self.extract_indent.match(last_line).group(1)
mbligh71d340d2008-03-05 15:51:16 +0000561 # if the last line starts a new group, add an extra indent
562 if last_line.lstrip('\t').startswith("START\t"):
563 indent += '\t'
mblighdab39662008-02-27 16:47:55 +0000564 return [self.job._render_record("WARN", None, None, msg,
565 timestamp, indent).rstrip('\n')
566 for timestamp, msg in warnings]
567
568
569 def _process_warnings(self, last_line, log_dict, warnings):
570 if log_dict.keys() in ([], ["logs"]):
571 # there are no sub-jobs, just append the warnings here
572 warnings = self._format_warnings(last_line, warnings)
573 log_list = log_dict.setdefault("logs", [])
574 log_list += warnings
575 for warning in warnings:
576 sys.stdout.write(warning + '\n')
577 else:
578 # there are sub-jobs, so put the warnings in there
579 log_list = log_dict.get("logs", [])
580 if log_list:
581 last_line = log_list[-1]
582 for key in sorted(log_dict.iterkeys()):
583 if key != "logs":
584 self._process_warnings(last_line,
585 log_dict[key],
586 warnings)
587
588
589 def write(self, data):
590 # first check for any new console warnings
591 warnings = self.job._read_warnings()
592 self._process_warnings(self.last_line, self.logs, warnings)
593 # now process the newest data written out
594 data = self.leftover + data
595 lines = data.split("\n")
596 # process every line but the last one
597 for line in lines[:-1]:
598 self._process_line(line)
599 # save the last line for later processing
600 # since we may not have the whole line yet
601 self.leftover = lines[-1]
602
603
604 def flush(self):
605 sys.stdout.flush()
606 sys.stderr.flush()
607
608
609 def close(self):
610 if self.leftover:
611 self._process_line(self.leftover)
612 self._process_logs()
613 self.flush()