blob: 67c6bff9b226bf53faf2f1cb2bc54f7803aa41c5 [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
3import sys, os, subprocess
4
5
6def parallel(tasklist):
7 """Run an set of predefined subcommands in parallel"""
8 pids = []
9 error = False
10 for task in tasklist:
11 task.fork_start()
12 for task in tasklist:
13 status = task.fork_waitfor()
14 if status != 0:
15 error = True
16 if error:
17 raise "One or more subcommands failed"
18
19
20def parallel_simple(function, arglist):
21 """Each element in the arglist used to create a subcommand object,
22 where that arg is used both as a subdir name, and a single argument
23 to pass to "function".
24 We create a subcommand object for each element in the list,
25 then execute those subcommand objects in parallel."""
26 subcommands = []
27 for arg in arglist:
28 subcommands.append(subcommand(function, [arg], arg))
29 parallel(subcommands)
30
31
32def __where_art_thy_filehandles():
33 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
34
35def __print_to_tty(string):
36 open('/dev/tty', 'w').write(string + '\n')
37
38
39def __redirect_stream(fd, output):
40 newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
41 os.dup2(newfd, fd)
42 os.close(newfd)
43 if fd == 1:
44 sys.stdout = os.fdopen(fd, 'w')
45 if fd == 2:
46 sys.stderr = os.fdopen(fd, 'w')
47
48
49def _redirect_stream_tee(fd, output, tag):
50 """Use the low-level fork & pipe operations here to get a fd,
51 not a filehandle. This ensures that we get both the
52 filehandle and fd for stdout/stderr redirected correctly."""
53 r, w = os.pipe()
54 pid = os.fork()
55 if pid: # Parent
56 os.dup2(w, fd)
57 os.close(r)
58 os.close(w)
59 if fd == 1:
60 sys.stdout = os.fdopen(fd, 'w', 1)
61 if fd == 2:
62 sys.stderr = os.fdopen(fd, 'w', 1)
63 return
64 else: # Child
65 os.close(w)
66 log = open(output, 'w')
67 f = os.fdopen(r, 'r')
68 for line in iter(f.readline, ''):
69 # Tee straight to file
70 log.write(line)
71 log.flush()
72 # Prepend stdout with the tag
73 print tag + ' : ' + line,
74 sys.stdout.flush()
75 log.close()
76 os._exit(0)
77
78
79class subcommand:
80 def __init__(self, func, args, subdir, tee=True):
81 # func(args) - the subcommand to run
82 # subdir - the subdirectory to log results in
83 self.subdir = os.path.abspath(subdir)
84 if os.path.exists(self.subdir):
85 os.system("rm -rf %s" % self.subdir)
86 os.mkdir(self.subdir)
87 self.debug = os.path.join(self.subdir, 'debug')
88 self.stdout = os.path.join(self.debug, 'stdout')
89 self.stderr = os.path.join(self.debug, 'stderr')
90 os.mkdir(self.debug)
91 self.func = func
92 self.args = args
93 self.lambda_function = lambda: func(*args)
94 self.pid = None
95 self.tee = tee
96
97
98 def redirect_output(self):
99 tag = os.path.basename(self.subdir)
100 _redirect_stream_tee(1, self.stdout, tag)
101 _redirect_stream_tee(2, self.stderr, tag)
102
103
104 def fork_start(self):
105 sys.stdout.flush()
106 sys.stderr.flush()
107 self.pid = os.fork()
108
109 if self.pid: # I am the parent
110 return
111
112 # We are the child from this point on. Never return.
113 os.chdir(self.subdir)
114 self.redirect_output()
115
116 try:
117 self.lambda_function()
118
119 except:
120 raise
121 sys.stdout.flush()
122 sys.stderr.flush()
123 os._exit(1)
124
125 sys.stdout.flush()
126 sys.stderr.flush()
127 os._exit(0)
128
129
130 def fork_waitfor(self):
131 (pid, status) = os.waitpid(self.pid, 0)
132
133 if status != 0:
134 print "subcommand failed pid %d" % pid
135 print "%s(%s)" % (self.func, self.args)
136 print "rc=%d" % status
137 print
138 if os.path.exists(self.stderr):
139 for line in open(self.stderr).readlines():
140 print line,
141 print "\n--------------------------------------------\n"
142 return status