blob: b1e1b273096b64ce8bb7fbf957353ba420f9fede [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mblighc3aee0f2008-01-17 16:26:39 +00003import sys, os, subprocess, traceback, time, utils
mbligh3177be42008-01-15 20:36:54 +00004from common.error import *
mblighb0fab822007-07-25 16:40:19 +00005
6
mblighc3aee0f2008-01-17 16:26:39 +00007def parallel(tasklist, timeout=None):
mblighb0fab822007-07-25 16:40:19 +00008 """Run an set of predefined subcommands in parallel"""
9 pids = []
10 error = False
11 for task in tasklist:
12 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000013
14 remaining_timeout = None
15 if timeout:
16 endtime = time.time() + timeout
17
mblighb0fab822007-07-25 16:40:19 +000018 for task in tasklist:
mblighc3aee0f2008-01-17 16:26:39 +000019 if timeout:
20 remaining_timeout = max(endtime - time.time(), 1)
21 status = task.fork_waitfor(remaining_timeout)
mblighb0fab822007-07-25 16:40:19 +000022 if status != 0:
23 error = True
24 if error:
mbligh4d6feff2008-01-14 16:48:56 +000025 raise AutoservError('One or more subcommands failed')
mblighb0fab822007-07-25 16:40:19 +000026
27
mblighc3aee0f2008-01-17 16:26:39 +000028def parallel_simple(function, arglist, log=True, timeout=None):
mblighb0fab822007-07-25 16:40:19 +000029 """Each element in the arglist used to create a subcommand object,
30 where that arg is used both as a subdir name, and a single argument
31 to pass to "function".
32 We create a subcommand object for each element in the list,
33 then execute those subcommand objects in parallel."""
mblighdd3235b2008-01-14 16:44:19 +000034
35 # Bypass the multithreading if only one machine.
36 if len (arglist) == 1:
37 function(arglist[0])
38 return
39
mblighb0fab822007-07-25 16:40:19 +000040 subcommands = []
41 for arg in arglist:
mbligh0c9e7822007-07-25 22:47:51 +000042 args = [arg]
mbligh84c0ab12007-10-24 21:28:58 +000043 if log:
44 subdir = str(arg)
45 else:
mblighc3aee0f2008-01-17 16:26:39 +000046 subdir = None
mbligh0c9e7822007-07-25 22:47:51 +000047 subcommands.append(subcommand(function, args, subdir))
mblighc3aee0f2008-01-17 16:26:39 +000048 parallel(subcommands, timeout)
mblighb0fab822007-07-25 16:40:19 +000049
50
mblighcee25b12007-08-31 08:53:05 +000051def _where_art_thy_filehandles():
mblighb0fab822007-07-25 16:40:19 +000052 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
53
mblighdc735a22007-08-02 16:54:37 +000054
mblighcee25b12007-08-31 08:53:05 +000055def _print_to_tty(string):
mblighb0fab822007-07-25 16:40:19 +000056 open('/dev/tty', 'w').write(string + '\n')
57
58
mblighcee25b12007-08-31 08:53:05 +000059def _redirect_stream(fd, output):
mblighb0fab822007-07-25 16:40:19 +000060 newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
61 os.dup2(newfd, fd)
62 os.close(newfd)
63 if fd == 1:
64 sys.stdout = os.fdopen(fd, 'w')
65 if fd == 2:
66 sys.stderr = os.fdopen(fd, 'w')
67
68
mblighcee25b12007-08-31 08:53:05 +000069def _redirect_stream_tee(fd, output, tag):
mblighb0fab822007-07-25 16:40:19 +000070 """Use the low-level fork & pipe operations here to get a fd,
71 not a filehandle. This ensures that we get both the
72 filehandle and fd for stdout/stderr redirected correctly."""
73 r, w = os.pipe()
74 pid = os.fork()
75 if pid: # Parent
76 os.dup2(w, fd)
77 os.close(r)
78 os.close(w)
79 if fd == 1:
80 sys.stdout = os.fdopen(fd, 'w', 1)
81 if fd == 2:
82 sys.stderr = os.fdopen(fd, 'w', 1)
83 return
84 else: # Child
85 os.close(w)
86 log = open(output, 'w')
87 f = os.fdopen(r, 'r')
88 for line in iter(f.readline, ''):
89 # Tee straight to file
90 log.write(line)
91 log.flush()
92 # Prepend stdout with the tag
93 print tag + ' : ' + line,
94 sys.stdout.flush()
95 log.close()
96 os._exit(0)
97
98
99class subcommand:
mblighd7685d32007-08-10 22:08:42 +0000100 def __init__(self, func, args, subdir = None, stdprint = True):
mblighb0fab822007-07-25 16:40:19 +0000101 # func(args) - the subcommand to run
102 # subdir - the subdirectory to log results in
mblighd7685d32007-08-10 22:08:42 +0000103 # stdprint - whether to print results to stdout/stderr
104 if subdir:
105 self.subdir = os.path.abspath(subdir)
106 if os.path.exists(self.subdir):
107 os.system("rm -rf %s" % self.subdir)
108 os.mkdir(self.subdir)
109 self.debug = os.path.join(self.subdir, 'debug')
mblighc526da32007-08-28 10:07:27 +0000110 os.mkdir(self.debug)
mblighd7685d32007-08-10 22:08:42 +0000111 self.stdout = os.path.join(self.debug, 'stdout')
112 self.stderr = os.path.join(self.debug, 'stderr')
113 else:
114 self.subdir = None
115 self.debug = '/dev/null'
116 self.stdout = '/dev/null'
117 self.stderr = '/dev/null'
118
mblighb0fab822007-07-25 16:40:19 +0000119 self.func = func
120 self.args = args
121 self.lambda_function = lambda: func(*args)
122 self.pid = None
mblighd7685d32007-08-10 22:08:42 +0000123 self.stdprint = stdprint
mblighb0fab822007-07-25 16:40:19 +0000124
125
126 def redirect_output(self):
mblighd7685d32007-08-10 22:08:42 +0000127 if self.stdprint:
128 if self.subdir:
129 tag = os.path.basename(self.subdir)
mblighcee25b12007-08-31 08:53:05 +0000130 _redirect_stream_tee(1, self.stdout, tag)
131 _redirect_stream_tee(2, self.stderr, tag)
mblighb491d022007-08-09 23:04:56 +0000132 else:
mblighcee25b12007-08-31 08:53:05 +0000133 _redirect_stream(1, self.stdout)
134 _redirect_stream(2, self.stderr)
mblighb0fab822007-07-25 16:40:19 +0000135
136
137 def fork_start(self):
138 sys.stdout.flush()
139 sys.stderr.flush()
140 self.pid = os.fork()
141
142 if self.pid: # I am the parent
143 return
144
145 # We are the child from this point on. Never return.
mblighd7685d32007-08-10 22:08:42 +0000146 if self.subdir:
147 os.chdir(self.subdir)
mblighb0fab822007-07-25 16:40:19 +0000148 self.redirect_output()
149
150 try:
151 self.lambda_function()
152
153 except:
mbligh42ff92f2007-11-24 19:13:15 +0000154 traceback.print_exc()
mblighb0fab822007-07-25 16:40:19 +0000155 sys.stdout.flush()
156 sys.stderr.flush()
157 os._exit(1)
158
159 sys.stdout.flush()
160 sys.stderr.flush()
161 os._exit(0)
162
163
mblighc3aee0f2008-01-17 16:26:39 +0000164 def fork_waitfor(self, timeout=None):
165 if not timeout:
166 (pid, status) = os.waitpid(self.pid, 0)
167 else:
168 pid = None
169 start_time = time.time()
170 while time.time() <= start_time + timeout:
171 (pid, status) = os.waitpid(self.pid, os.WNOHANG)
172 if pid:
173 break
174 time.sleep(1)
175
176 if not pid:
177 utils.nuke_pid(self.pid)
178 print "subcommand failed pid %d" % self.pid
179 print "%s(%s)" % (self.func, self.args)
180 print "timeout after %ds" % timeout
181 print
182 return None
mblighb0fab822007-07-25 16:40:19 +0000183
184 if status != 0:
185 print "subcommand failed pid %d" % pid
186 print "%s(%s)" % (self.func, self.args)
187 print "rc=%d" % status
188 print
189 if os.path.exists(self.stderr):
190 for line in open(self.stderr).readlines():
191 print line,
192 print "\n--------------------------------------------\n"
193 return status