blob: 54424f6a4411dd422fe4080ec2b59523f268de36 [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
3import sys, os, subprocess
4
5
6def parallel(tasklist):
7 """Run an set of predefined subcommands in parallel"""
8 pids = []
9 error = False
10 for task in tasklist:
11 task.fork_start()
12 for task in tasklist:
13 status = task.fork_waitfor()
14 if status != 0:
15 error = True
16 if error:
17 raise "One or more subcommands failed"
18
19
20def parallel_simple(function, arglist):
21 """Each element in the arglist used to create a subcommand object,
22 where that arg is used both as a subdir name, and a single argument
23 to pass to "function".
24 We create a subcommand object for each element in the list,
25 then execute those subcommand objects in parallel."""
26 subcommands = []
27 for arg in arglist:
mbligh0c9e7822007-07-25 22:47:51 +000028 args = [arg]
29 subdir = str(arg)
30 subcommands.append(subcommand(function, args, subdir))
mblighb0fab822007-07-25 16:40:19 +000031 parallel(subcommands)
32
33
34def __where_art_thy_filehandles():
35 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
36
37def __print_to_tty(string):
38 open('/dev/tty', 'w').write(string + '\n')
39
40
41def __redirect_stream(fd, output):
42 newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
43 os.dup2(newfd, fd)
44 os.close(newfd)
45 if fd == 1:
46 sys.stdout = os.fdopen(fd, 'w')
47 if fd == 2:
48 sys.stderr = os.fdopen(fd, 'w')
49
50
51def _redirect_stream_tee(fd, output, tag):
52 """Use the low-level fork & pipe operations here to get a fd,
53 not a filehandle. This ensures that we get both the
54 filehandle and fd for stdout/stderr redirected correctly."""
55 r, w = os.pipe()
56 pid = os.fork()
57 if pid: # Parent
58 os.dup2(w, fd)
59 os.close(r)
60 os.close(w)
61 if fd == 1:
62 sys.stdout = os.fdopen(fd, 'w', 1)
63 if fd == 2:
64 sys.stderr = os.fdopen(fd, 'w', 1)
65 return
66 else: # Child
67 os.close(w)
68 log = open(output, 'w')
69 f = os.fdopen(r, 'r')
70 for line in iter(f.readline, ''):
71 # Tee straight to file
72 log.write(line)
73 log.flush()
74 # Prepend stdout with the tag
75 print tag + ' : ' + line,
76 sys.stdout.flush()
77 log.close()
78 os._exit(0)
79
80
81class subcommand:
82 def __init__(self, func, args, subdir, tee=True):
83 # func(args) - the subcommand to run
84 # subdir - the subdirectory to log results in
85 self.subdir = os.path.abspath(subdir)
86 if os.path.exists(self.subdir):
87 os.system("rm -rf %s" % self.subdir)
88 os.mkdir(self.subdir)
89 self.debug = os.path.join(self.subdir, 'debug')
90 self.stdout = os.path.join(self.debug, 'stdout')
91 self.stderr = os.path.join(self.debug, 'stderr')
92 os.mkdir(self.debug)
93 self.func = func
94 self.args = args
95 self.lambda_function = lambda: func(*args)
96 self.pid = None
97 self.tee = tee
98
99
100 def redirect_output(self):
101 tag = os.path.basename(self.subdir)
102 _redirect_stream_tee(1, self.stdout, tag)
103 _redirect_stream_tee(2, self.stderr, tag)
104
105
106 def fork_start(self):
107 sys.stdout.flush()
108 sys.stderr.flush()
109 self.pid = os.fork()
110
111 if self.pid: # I am the parent
112 return
113
114 # We are the child from this point on. Never return.
115 os.chdir(self.subdir)
116 self.redirect_output()
117
118 try:
119 self.lambda_function()
120
121 except:
122 raise
123 sys.stdout.flush()
124 sys.stderr.flush()
125 os._exit(1)
126
127 sys.stdout.flush()
128 sys.stderr.flush()
129 os._exit(0)
130
131
132 def fork_waitfor(self):
133 (pid, status) = os.waitpid(self.pid, 0)
134
135 if status != 0:
136 print "subcommand failed pid %d" % pid
137 print "%s(%s)" % (self.func, self.args)
138 print "rc=%d" % status
139 print
140 if os.path.exists(self.stderr):
141 for line in open(self.stderr).readlines():
142 print line,
143 print "\n--------------------------------------------\n"
144 return status