blob: 251c931004f6ce7fd0d4fc27647912ba8458f600 [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mbligh42ff92f2007-11-24 19:13:15 +00003import sys, os, subprocess, traceback
mblighb0fab822007-07-25 16:40:19 +00004
5
6def parallel(tasklist):
7 """Run an set of predefined subcommands in parallel"""
8 pids = []
9 error = False
10 for task in tasklist:
11 task.fork_start()
12 for task in tasklist:
13 status = task.fork_waitfor()
14 if status != 0:
15 error = True
16 if error:
17 raise "One or more subcommands failed"
18
19
mbligh84c0ab12007-10-24 21:28:58 +000020def parallel_simple(function, arglist, log=True):
mblighb0fab822007-07-25 16:40:19 +000021 """Each element in the arglist used to create a subcommand object,
22 where that arg is used both as a subdir name, and a single argument
23 to pass to "function".
24 We create a subcommand object for each element in the list,
25 then execute those subcommand objects in parallel."""
26 subcommands = []
27 for arg in arglist:
mbligh0c9e7822007-07-25 22:47:51 +000028 args = [arg]
mbligh84c0ab12007-10-24 21:28:58 +000029 if log:
30 subdir = str(arg)
31 else:
32 subdir = None
mbligh0c9e7822007-07-25 22:47:51 +000033 subcommands.append(subcommand(function, args, subdir))
mblighb0fab822007-07-25 16:40:19 +000034 parallel(subcommands)
35
36
mblighcee25b12007-08-31 08:53:05 +000037def _where_art_thy_filehandles():
mblighb0fab822007-07-25 16:40:19 +000038 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
39
mblighdc735a22007-08-02 16:54:37 +000040
mblighcee25b12007-08-31 08:53:05 +000041def _print_to_tty(string):
mblighb0fab822007-07-25 16:40:19 +000042 open('/dev/tty', 'w').write(string + '\n')
43
44
mblighcee25b12007-08-31 08:53:05 +000045def _redirect_stream(fd, output):
mblighb0fab822007-07-25 16:40:19 +000046 newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
47 os.dup2(newfd, fd)
48 os.close(newfd)
49 if fd == 1:
50 sys.stdout = os.fdopen(fd, 'w')
51 if fd == 2:
52 sys.stderr = os.fdopen(fd, 'w')
53
54
mblighcee25b12007-08-31 08:53:05 +000055def _redirect_stream_tee(fd, output, tag):
mblighb0fab822007-07-25 16:40:19 +000056 """Use the low-level fork & pipe operations here to get a fd,
57 not a filehandle. This ensures that we get both the
58 filehandle and fd for stdout/stderr redirected correctly."""
59 r, w = os.pipe()
60 pid = os.fork()
61 if pid: # Parent
62 os.dup2(w, fd)
63 os.close(r)
64 os.close(w)
65 if fd == 1:
66 sys.stdout = os.fdopen(fd, 'w', 1)
67 if fd == 2:
68 sys.stderr = os.fdopen(fd, 'w', 1)
69 return
70 else: # Child
71 os.close(w)
72 log = open(output, 'w')
73 f = os.fdopen(r, 'r')
74 for line in iter(f.readline, ''):
75 # Tee straight to file
76 log.write(line)
77 log.flush()
78 # Prepend stdout with the tag
79 print tag + ' : ' + line,
80 sys.stdout.flush()
81 log.close()
82 os._exit(0)
83
84
85class subcommand:
mblighd7685d32007-08-10 22:08:42 +000086 def __init__(self, func, args, subdir = None, stdprint = True):
mblighb0fab822007-07-25 16:40:19 +000087 # func(args) - the subcommand to run
88 # subdir - the subdirectory to log results in
mblighd7685d32007-08-10 22:08:42 +000089 # stdprint - whether to print results to stdout/stderr
90 if subdir:
91 self.subdir = os.path.abspath(subdir)
92 if os.path.exists(self.subdir):
93 os.system("rm -rf %s" % self.subdir)
94 os.mkdir(self.subdir)
95 self.debug = os.path.join(self.subdir, 'debug')
mblighc526da32007-08-28 10:07:27 +000096 os.mkdir(self.debug)
mblighd7685d32007-08-10 22:08:42 +000097 self.stdout = os.path.join(self.debug, 'stdout')
98 self.stderr = os.path.join(self.debug, 'stderr')
99 else:
100 self.subdir = None
101 self.debug = '/dev/null'
102 self.stdout = '/dev/null'
103 self.stderr = '/dev/null'
104
mblighb0fab822007-07-25 16:40:19 +0000105 self.func = func
106 self.args = args
107 self.lambda_function = lambda: func(*args)
108 self.pid = None
mblighd7685d32007-08-10 22:08:42 +0000109 self.stdprint = stdprint
mblighb0fab822007-07-25 16:40:19 +0000110
111
112 def redirect_output(self):
mblighd7685d32007-08-10 22:08:42 +0000113 if self.stdprint:
114 if self.subdir:
115 tag = os.path.basename(self.subdir)
mblighcee25b12007-08-31 08:53:05 +0000116 _redirect_stream_tee(1, self.stdout, tag)
117 _redirect_stream_tee(2, self.stderr, tag)
mblighb491d022007-08-09 23:04:56 +0000118 else:
mblighcee25b12007-08-31 08:53:05 +0000119 _redirect_stream(1, self.stdout)
120 _redirect_stream(2, self.stderr)
mblighb0fab822007-07-25 16:40:19 +0000121
122
123 def fork_start(self):
124 sys.stdout.flush()
125 sys.stderr.flush()
126 self.pid = os.fork()
127
128 if self.pid: # I am the parent
129 return
130
131 # We are the child from this point on. Never return.
mblighd7685d32007-08-10 22:08:42 +0000132 if self.subdir:
133 os.chdir(self.subdir)
mblighb0fab822007-07-25 16:40:19 +0000134 self.redirect_output()
135
136 try:
137 self.lambda_function()
138
139 except:
mbligh42ff92f2007-11-24 19:13:15 +0000140 traceback.print_exc()
mblighb0fab822007-07-25 16:40:19 +0000141 sys.stdout.flush()
142 sys.stderr.flush()
143 os._exit(1)
144
145 sys.stdout.flush()
146 sys.stderr.flush()
147 os._exit(0)
148
149
150 def fork_waitfor(self):
151 (pid, status) = os.waitpid(self.pid, 0)
152
153 if status != 0:
154 print "subcommand failed pid %d" % pid
155 print "%s(%s)" % (self.func, self.args)
156 print "rc=%d" % status
157 print
158 if os.path.exists(self.stderr):
159 for line in open(self.stderr).readlines():
160 print line,
161 print "\n--------------------------------------------\n"
162 return status