Add parallel support to new server
Signed-off-by: Ryan Stutsman <stutsman@google.com>
Adds support for parallel execution. Taken almost verbatim from old
autoserv code but with a few tweaks to access level on some functions.
Here is a tiny example of its usage in a server control file form:
import time
args = [str(i) for i in range(4)]
def f(a):
print "This is %s" % a
time.sleep(1)
print "This is %s; almost done" % a
time.sleep(1)
parallel_simple(f, args)
git-svn-id: http://test.kernel.org/svn/autotest/trunk@583 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/subcommand.py b/server/subcommand.py
new file mode 100644
index 0000000..67c6bff
--- /dev/null
+++ b/server/subcommand.py
@@ -0,0 +1,142 @@
+__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
+
+import sys, os, subprocess
+
+
+def parallel(tasklist):
+ """Run an set of predefined subcommands in parallel"""
+ pids = []
+ error = False
+ for task in tasklist:
+ task.fork_start()
+ for task in tasklist:
+ status = task.fork_waitfor()
+ if status != 0:
+ error = True
+ if error:
+ raise "One or more subcommands failed"
+
+
+def parallel_simple(function, arglist):
+ """Each element in the arglist used to create a subcommand object,
+ where that arg is used both as a subdir name, and a single argument
+ to pass to "function".
+ We create a subcommand object for each element in the list,
+ then execute those subcommand objects in parallel."""
+ subcommands = []
+ for arg in arglist:
+ subcommands.append(subcommand(function, [arg], arg))
+ parallel(subcommands)
+
+
+def __where_art_thy_filehandles():
+ os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
+
+def __print_to_tty(string):
+ open('/dev/tty', 'w').write(string + '\n')
+
+
+def __redirect_stream(fd, output):
+ newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
+ os.dup2(newfd, fd)
+ os.close(newfd)
+ if fd == 1:
+ sys.stdout = os.fdopen(fd, 'w')
+ if fd == 2:
+ sys.stderr = os.fdopen(fd, 'w')
+
+
+def _redirect_stream_tee(fd, output, tag):
+ """Use the low-level fork & pipe operations here to get a fd,
+ not a filehandle. This ensures that we get both the
+ filehandle and fd for stdout/stderr redirected correctly."""
+ r, w = os.pipe()
+ pid = os.fork()
+ if pid: # Parent
+ os.dup2(w, fd)
+ os.close(r)
+ os.close(w)
+ if fd == 1:
+ sys.stdout = os.fdopen(fd, 'w', 1)
+ if fd == 2:
+ sys.stderr = os.fdopen(fd, 'w', 1)
+ return
+ else: # Child
+ os.close(w)
+ log = open(output, 'w')
+ f = os.fdopen(r, 'r')
+ for line in iter(f.readline, ''):
+ # Tee straight to file
+ log.write(line)
+ log.flush()
+ # Prepend stdout with the tag
+ print tag + ' : ' + line,
+ sys.stdout.flush()
+ log.close()
+ os._exit(0)
+
+
+class subcommand:
+ def __init__(self, func, args, subdir, tee=True):
+ # func(args) - the subcommand to run
+ # subdir - the subdirectory to log results in
+ self.subdir = os.path.abspath(subdir)
+ if os.path.exists(self.subdir):
+ os.system("rm -rf %s" % self.subdir)
+ os.mkdir(self.subdir)
+ self.debug = os.path.join(self.subdir, 'debug')
+ self.stdout = os.path.join(self.debug, 'stdout')
+ self.stderr = os.path.join(self.debug, 'stderr')
+ os.mkdir(self.debug)
+ self.func = func
+ self.args = args
+ self.lambda_function = lambda: func(*args)
+ self.pid = None
+ self.tee = tee
+
+
+ def redirect_output(self):
+ tag = os.path.basename(self.subdir)
+ _redirect_stream_tee(1, self.stdout, tag)
+ _redirect_stream_tee(2, self.stderr, tag)
+
+
+ def fork_start(self):
+ sys.stdout.flush()
+ sys.stderr.flush()
+ self.pid = os.fork()
+
+ if self.pid: # I am the parent
+ return
+
+ # We are the child from this point on. Never return.
+ os.chdir(self.subdir)
+ self.redirect_output()
+
+ try:
+ self.lambda_function()
+
+ except:
+ raise
+ sys.stdout.flush()
+ sys.stderr.flush()
+ os._exit(1)
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+ os._exit(0)
+
+
+ def fork_waitfor(self):
+ (pid, status) = os.waitpid(self.pid, 0)
+
+ if status != 0:
+ print "subcommand failed pid %d" % pid
+ print "%s(%s)" % (self.func, self.args)
+ print "rc=%d" % status
+ print
+ if os.path.exists(self.stderr):
+ for line in open(self.stderr).readlines():
+ print line,
+ print "\n--------------------------------------------\n"
+ return status