blob: dbeb8b6844a214100db65aeb81ddd6d22e546638 [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mbligh42ff92f2007-11-24 19:13:15 +00003import sys, os, subprocess, traceback
mbligh3177be42008-01-15 20:36:54 +00004from common.error import *
mblighb0fab822007-07-25 16:40:19 +00005
6
7def parallel(tasklist):
8 """Run an set of predefined subcommands in parallel"""
9 pids = []
10 error = False
11 for task in tasklist:
12 task.fork_start()
13 for task in tasklist:
14 status = task.fork_waitfor()
15 if status != 0:
16 error = True
17 if error:
mbligh4d6feff2008-01-14 16:48:56 +000018 raise AutoservError('One or more subcommands failed')
mblighb0fab822007-07-25 16:40:19 +000019
20
mbligh84c0ab12007-10-24 21:28:58 +000021def parallel_simple(function, arglist, log=True):
mblighb0fab822007-07-25 16:40:19 +000022 """Each element in the arglist used to create a subcommand object,
23 where that arg is used both as a subdir name, and a single argument
24 to pass to "function".
25 We create a subcommand object for each element in the list,
26 then execute those subcommand objects in parallel."""
mblighdd3235b2008-01-14 16:44:19 +000027
28 # Bypass the multithreading if only one machine.
29 if len (arglist) == 1:
30 function(arglist[0])
31 return
32
mblighb0fab822007-07-25 16:40:19 +000033 subcommands = []
34 for arg in arglist:
mbligh0c9e7822007-07-25 22:47:51 +000035 args = [arg]
mbligh84c0ab12007-10-24 21:28:58 +000036 if log:
37 subdir = str(arg)
38 else:
39 subdir = None
mbligh0c9e7822007-07-25 22:47:51 +000040 subcommands.append(subcommand(function, args, subdir))
mblighb0fab822007-07-25 16:40:19 +000041 parallel(subcommands)
42
43
mblighcee25b12007-08-31 08:53:05 +000044def _where_art_thy_filehandles():
mblighb0fab822007-07-25 16:40:19 +000045 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
46
mblighdc735a22007-08-02 16:54:37 +000047
mblighcee25b12007-08-31 08:53:05 +000048def _print_to_tty(string):
mblighb0fab822007-07-25 16:40:19 +000049 open('/dev/tty', 'w').write(string + '\n')
50
51
mblighcee25b12007-08-31 08:53:05 +000052def _redirect_stream(fd, output):
mblighb0fab822007-07-25 16:40:19 +000053 newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
54 os.dup2(newfd, fd)
55 os.close(newfd)
56 if fd == 1:
57 sys.stdout = os.fdopen(fd, 'w')
58 if fd == 2:
59 sys.stderr = os.fdopen(fd, 'w')
60
61
mblighcee25b12007-08-31 08:53:05 +000062def _redirect_stream_tee(fd, output, tag):
mblighb0fab822007-07-25 16:40:19 +000063 """Use the low-level fork & pipe operations here to get a fd,
64 not a filehandle. This ensures that we get both the
65 filehandle and fd for stdout/stderr redirected correctly."""
66 r, w = os.pipe()
67 pid = os.fork()
68 if pid: # Parent
69 os.dup2(w, fd)
70 os.close(r)
71 os.close(w)
72 if fd == 1:
73 sys.stdout = os.fdopen(fd, 'w', 1)
74 if fd == 2:
75 sys.stderr = os.fdopen(fd, 'w', 1)
76 return
77 else: # Child
78 os.close(w)
79 log = open(output, 'w')
80 f = os.fdopen(r, 'r')
81 for line in iter(f.readline, ''):
82 # Tee straight to file
83 log.write(line)
84 log.flush()
85 # Prepend stdout with the tag
86 print tag + ' : ' + line,
87 sys.stdout.flush()
88 log.close()
89 os._exit(0)
90
91
92class subcommand:
mblighd7685d32007-08-10 22:08:42 +000093 def __init__(self, func, args, subdir = None, stdprint = True):
mblighb0fab822007-07-25 16:40:19 +000094 # func(args) - the subcommand to run
95 # subdir - the subdirectory to log results in
mblighd7685d32007-08-10 22:08:42 +000096 # stdprint - whether to print results to stdout/stderr
97 if subdir:
98 self.subdir = os.path.abspath(subdir)
99 if os.path.exists(self.subdir):
100 os.system("rm -rf %s" % self.subdir)
101 os.mkdir(self.subdir)
102 self.debug = os.path.join(self.subdir, 'debug')
mblighc526da32007-08-28 10:07:27 +0000103 os.mkdir(self.debug)
mblighd7685d32007-08-10 22:08:42 +0000104 self.stdout = os.path.join(self.debug, 'stdout')
105 self.stderr = os.path.join(self.debug, 'stderr')
106 else:
107 self.subdir = None
108 self.debug = '/dev/null'
109 self.stdout = '/dev/null'
110 self.stderr = '/dev/null'
111
mblighb0fab822007-07-25 16:40:19 +0000112 self.func = func
113 self.args = args
114 self.lambda_function = lambda: func(*args)
115 self.pid = None
mblighd7685d32007-08-10 22:08:42 +0000116 self.stdprint = stdprint
mblighb0fab822007-07-25 16:40:19 +0000117
118
119 def redirect_output(self):
mblighd7685d32007-08-10 22:08:42 +0000120 if self.stdprint:
121 if self.subdir:
122 tag = os.path.basename(self.subdir)
mblighcee25b12007-08-31 08:53:05 +0000123 _redirect_stream_tee(1, self.stdout, tag)
124 _redirect_stream_tee(2, self.stderr, tag)
mblighb491d022007-08-09 23:04:56 +0000125 else:
mblighcee25b12007-08-31 08:53:05 +0000126 _redirect_stream(1, self.stdout)
127 _redirect_stream(2, self.stderr)
mblighb0fab822007-07-25 16:40:19 +0000128
129
130 def fork_start(self):
131 sys.stdout.flush()
132 sys.stderr.flush()
133 self.pid = os.fork()
134
135 if self.pid: # I am the parent
136 return
137
138 # We are the child from this point on. Never return.
mblighd7685d32007-08-10 22:08:42 +0000139 if self.subdir:
140 os.chdir(self.subdir)
mblighb0fab822007-07-25 16:40:19 +0000141 self.redirect_output()
142
143 try:
144 self.lambda_function()
145
146 except:
mbligh42ff92f2007-11-24 19:13:15 +0000147 traceback.print_exc()
mblighb0fab822007-07-25 16:40:19 +0000148 sys.stdout.flush()
149 sys.stderr.flush()
150 os._exit(1)
151
152 sys.stdout.flush()
153 sys.stderr.flush()
154 os._exit(0)
155
156
157 def fork_waitfor(self):
158 (pid, status) = os.waitpid(self.pid, 0)
159
160 if status != 0:
161 print "subcommand failed pid %d" % pid
162 print "%s(%s)" % (self.func, self.args)
163 print "rc=%d" % status
164 print
165 if os.path.exists(self.stderr):
166 for line in open(self.stderr).readlines():
167 print line,
168 print "\n--------------------------------------------\n"
169 return status