blob: 6a03abfe1b13e5785292598c8f7c9b872c762278 [file] [log] [blame]
Derek Beckettdb735112020-08-27 10:25:15 -07001# Lint as: python2, python3
2from __future__ import absolute_import
3from __future__ import division
4from __future__ import print_function
mblighbd1eb202008-07-29 22:09:29 +00005import os, sys
Derek Beckettdb735112020-08-27 10:25:15 -07006import six
7from six.moves import range
mblighbd1eb202008-07-29 22:09:29 +00008
9
10class ParallelError(Exception):
11 def __init__(self, str, errors):
12 self.str = str
13 self.errors = errors
14 Exception.__init__(self, str)
15
16
17class ParallelExecute(object):
18 def __init__(self, functions, max_simultaneous_procs=20):
19 """\
20 This takes in a dictionary of functions which map to a set of
21 functions that they depend on.
22
23 functions: This is either a list of or dictionary of functions to
24 be run. If it's a dictionary, the value should be a set
25 of other functions this function is dependent on. If its
26 a list (or tuple or anything iterable that returns a
27 single element each iteration), then it's assumed that
28 there are no dependencies.
29
30 max_simultaneous_procs: Throttle the number of processes we have
31 running at once.
32 """
33 if not isinstance(functions, dict):
34 function_list = functions
35 functions = {}
36 for fn in function_list:
37 functions[fn] = set()
38
39 dependents = {}
Derek Beckettdb735112020-08-27 10:25:15 -070040 for fn, deps in six.iteritems(functions):
mblighbd1eb202008-07-29 22:09:29 +000041 dependents[fn] = []
Derek Beckettdb735112020-08-27 10:25:15 -070042 for fn, deps in six.iteritems(functions):
mblighbd1eb202008-07-29 22:09:29 +000043 for dep in deps:
44 dependents[dep].append(fn)
45
46 self.max_procs = max_simultaneous_procs
47 self.functions = functions
48 self.dependents = dependents
49 self.pid_map = {}
50 self.ready_to_run = []
51
52
53 def _run(self, function):
54 self.functions.pop(function)
55 pid = os.fork()
56 if pid:
57 self.pid_map[pid] = function
58 else:
59 function()
60 sys.exit(0)
61
62
63 def run_until_completion(self):
Derek Beckettdb735112020-08-27 10:25:15 -070064 for fn, deps in six.iteritems(self.functions):
mblighbd1eb202008-07-29 22:09:29 +000065 if len(deps) == 0:
66 self.ready_to_run.append(fn)
67
68 errors = []
69 while len(self.pid_map) > 0 or len(self.ready_to_run) > 0:
70 max_allowed = self.max_procs - len(self.pid_map)
71 max_able = len(self.ready_to_run)
Derek Beckettdb735112020-08-27 10:25:15 -070072 for i in range(min(max_allowed, max_able)):
mblighbd1eb202008-07-29 22:09:29 +000073 self._run(self.ready_to_run.pop())
74
75 # Handle one proc that's finished.
76 pid, status = os.wait()
77 fn = self.pid_map.pop(pid)
78 if status != 0:
79 errors.append("%s failed" % fn.__name__)
80 continue
81
82 for dependent in self.dependents[fn]:
83 self.functions[dependent].remove(fn)
84 if len(self.functions[dependent]) == 0:
85 self.ready_to_run.append(dependent)
86
mblighab0b0382008-08-21 20:18:04 +000087 if len(self.functions) > 0 and len(errors) == 0:
mblighbd1eb202008-07-29 22:09:29 +000088 errors.append("Deadlock detected")
89
90 if len(errors) > 0:
91 msg = "Errors occurred during execution:"
92 msg = '\n'.join([msg] + errors)
93 raise ParallelError(msg, errors)
94
95
96def redirect_io(log_file='/dev/null'):
97 # Always redirect stdin.
98 in_fd = os.open('/dev/null', os.O_RDONLY)
99 try:
100 os.dup2(in_fd, 0)
101 finally:
102 os.close(in_fd)
103
104 out_fd = os.open(log_file, os.O_WRONLY | os.O_CREAT)
105 try:
106 os.dup2(out_fd, 2)
107 os.dup2(out_fd, 1)
108 finally:
109 os.close(out_fd)
110
111 sys.stdin = os.fdopen(0, 'r')
112 sys.stdout = os.fdopen(1, 'w')
113 sys.stderr = os.fdopen(2, 'w')