blob: ef4c403ede56cd945bfa525768bf7c3483efe2d3 [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
3import sys, os, subprocess
4
5
6def parallel(tasklist):
7 """Run an set of predefined subcommands in parallel"""
8 pids = []
9 error = False
10 for task in tasklist:
11 task.fork_start()
12 for task in tasklist:
13 status = task.fork_waitfor()
14 if status != 0:
15 error = True
16 if error:
17 raise "One or more subcommands failed"
18
19
20def parallel_simple(function, arglist):
21 """Each element in the arglist used to create a subcommand object,
22 where that arg is used both as a subdir name, and a single argument
23 to pass to "function".
24 We create a subcommand object for each element in the list,
25 then execute those subcommand objects in parallel."""
26 subcommands = []
27 for arg in arglist:
mbligh0c9e7822007-07-25 22:47:51 +000028 args = [arg]
29 subdir = str(arg)
30 subcommands.append(subcommand(function, args, subdir))
mblighb0fab822007-07-25 16:40:19 +000031 parallel(subcommands)
32
33
34def __where_art_thy_filehandles():
35 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
36
mblighdc735a22007-08-02 16:54:37 +000037
mblighb0fab822007-07-25 16:40:19 +000038def __print_to_tty(string):
39 open('/dev/tty', 'w').write(string + '\n')
40
41
42def __redirect_stream(fd, output):
43 newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
44 os.dup2(newfd, fd)
45 os.close(newfd)
46 if fd == 1:
47 sys.stdout = os.fdopen(fd, 'w')
48 if fd == 2:
49 sys.stderr = os.fdopen(fd, 'w')
50
51
mblighb491d022007-08-09 23:04:56 +000052def __redirect_stream_tee(fd, output, tag):
mblighb0fab822007-07-25 16:40:19 +000053 """Use the low-level fork & pipe operations here to get a fd,
54 not a filehandle. This ensures that we get both the
55 filehandle and fd for stdout/stderr redirected correctly."""
56 r, w = os.pipe()
57 pid = os.fork()
58 if pid: # Parent
59 os.dup2(w, fd)
60 os.close(r)
61 os.close(w)
62 if fd == 1:
63 sys.stdout = os.fdopen(fd, 'w', 1)
64 if fd == 2:
65 sys.stderr = os.fdopen(fd, 'w', 1)
66 return
67 else: # Child
68 os.close(w)
69 log = open(output, 'w')
70 f = os.fdopen(r, 'r')
71 for line in iter(f.readline, ''):
72 # Tee straight to file
73 log.write(line)
74 log.flush()
75 # Prepend stdout with the tag
76 print tag + ' : ' + line,
77 sys.stdout.flush()
78 log.close()
79 os._exit(0)
80
81
82class subcommand:
83 def __init__(self, func, args, subdir, tee=True):
84 # func(args) - the subcommand to run
85 # subdir - the subdirectory to log results in
mblighb491d022007-08-09 23:04:56 +000086 if not subdir:
87 raise "No subdirectory specified for subcommand"
mblighb0fab822007-07-25 16:40:19 +000088 self.subdir = os.path.abspath(subdir)
89 if os.path.exists(self.subdir):
90 os.system("rm -rf %s" % self.subdir)
91 os.mkdir(self.subdir)
92 self.debug = os.path.join(self.subdir, 'debug')
93 self.stdout = os.path.join(self.debug, 'stdout')
94 self.stderr = os.path.join(self.debug, 'stderr')
95 os.mkdir(self.debug)
96 self.func = func
97 self.args = args
98 self.lambda_function = lambda: func(*args)
99 self.pid = None
100 self.tee = tee
101
102
103 def redirect_output(self):
mblighc78523d2007-08-10 19:12:47 +0000104 if self.tee:
mblighb491d022007-08-09 23:04:56 +0000105 tag = os.path.basename(self.subdir)
106 __redirect_stream_tee(1, self.stdout, tag)
107 __redirect_stream_tee(2, self.stderr, tag)
108 else:
109 __redirect_stream(1, self.stdout)
110 __redirect_stream(2, self.stderr)
mblighb0fab822007-07-25 16:40:19 +0000111
112
113 def fork_start(self):
114 sys.stdout.flush()
115 sys.stderr.flush()
116 self.pid = os.fork()
117
118 if self.pid: # I am the parent
119 return
120
121 # We are the child from this point on. Never return.
122 os.chdir(self.subdir)
123 self.redirect_output()
124
125 try:
126 self.lambda_function()
127
128 except:
129 raise
130 sys.stdout.flush()
131 sys.stderr.flush()
132 os._exit(1)
133
134 sys.stdout.flush()
135 sys.stderr.flush()
136 os._exit(0)
137
138
139 def fork_waitfor(self):
140 (pid, status) = os.waitpid(self.pid, 0)
141
142 if status != 0:
143 print "subcommand failed pid %d" % pid
144 print "%s(%s)" % (self.func, self.args)
145 print "rc=%d" % status
146 print
147 if os.path.exists(self.stderr):
148 for line in open(self.stderr).readlines():
149 print line,
150 print "\n--------------------------------------------\n"
151 return status