blob: 42dd49908d240c43e3582243582380b2bf6e94c8 [file] [log] [blame]
mbligh67647152008-11-19 00:18:14 +00001# Copyright Martin J. Bligh, Google Inc 2008
2# Released under the GPL v2
3
4"""
5This class allows you to communicate with the frontend to submit jobs etc
6It is designed for writing more sophisiticated server-side control files that
7can recursively add and manage other jobs.
8
9We turn the JSON dictionaries into real objects that are more idiomatic
10
11For docs, see http://autotest//afe/server/noauth/rpc/
12http://docs.djangoproject.com/en/dev/ref/models/querysets/#queryset-api
13"""
14
mbligh5b618382008-12-03 15:24:01 +000015import os, time, traceback
mbligh67647152008-11-19 00:18:14 +000016import common
17from autotest_lib.frontend.afe import rpc_client_lib
18from autotest_lib.client.common_lib import utils
19
20
21def dump_object(header, obj):
22 """
23 Standard way to print out the frontend objects (eg job, host, acl, label)
24 in a human-readable fashion for debugging
25 """
26 result = header + '\n'
27 for key in obj.hash:
28 if key == 'afe' or key == 'hash':
29 continue
30 result += '%20s: %s\n' % (key, obj.hash[key])
31 return result
32
33
34class afe(object):
35 """
36 AFE class for communicating with the autotest frontend
37
38 All the constructors go in the afe class.
39 Manipulating methods go in the classes themselves
40 """
41 def __init__(self, user=os.environ.get('LOGNAME'),
42 web_server='http://autotest', print_log=True, debug=False):
43 """
44 Create a cached instance of a connection to the AFE
45
46 user: username to connect as
47 web_server: AFE instance to connect to
48 print_log: pring a logging message to stdout on every operation
49 debug: print out all RPC traffic
50 """
51 self.user = user
52 self.print_log = print_log
53 self.debug = debug
54 headers = {'AUTHORIZATION' : self.user}
55 rpc_server = web_server + '/afe/server/noauth/rpc/'
56 self.proxy = rpc_client_lib.get_proxy(rpc_server, headers=headers)
57
58
59 def run(self, call, **dargs):
60 """
61 Make a RPC call to the AFE server
62 """
63 rpc_call = getattr(self.proxy, call)
64 if self.debug:
65 print 'DEBUG: %s %s' % (call, dargs)
66 return utils.strip_unicode(rpc_call(**dargs))
67
68
69 def log(self, message):
70 if self.print_log:
71 print message
72
73
74 def host_statuses(self, live=None):
75 dead_statuses = ['Dead', 'Repair Failed']
76 statuses = self.run('get_static_data')['host_statuses']
77 if live == True:
78 return list(set(statuses) - set(['Dead', 'Repair Failed']))
79 if live == False:
80 return dead_statuses
81 else:
82 return statuses
83
84
85 def get_hosts(self, **dargs):
86 hosts = self.run('get_hosts', **dargs)
87 return [host(self, h) for h in hosts]
88
89
90 def create_host(self, hostname, **dargs):
91 id = self.run('add_host', **dargs)
92 return self.get_hosts(id=id)[0]
93
94
95 def get_labels(self, **dargs):
96 labels = self.run('get_labels', **dargs)
97 return [label(self, l) for l in labels]
98
99
100 def create_label(self, name, **dargs):
101 id = self.run('add_label', **dargs)
102 return self.get_labels(id=id)[0]
103
104
105 def get_acls(self, **dargs):
106 acls = self.run('get_acl_groups', **dargs)
107 return [acl(self, a) for a in acls]
108
109
110 def create_acl(self, name, **dargs):
111 id = self.run('add_acl_group', **dargs)
112 return self.get_acls(id=id)[0]
113
114
115 def get_jobs(self, summary=False, **dargs):
116 if summary:
117 jobs_data = self.run('get_jobs_summary', **dargs)
118 else:
119 jobs_data = self.run('get_jobs', **dargs)
120 return [job(self, j) for j in jobs_data]
121
122
123 def get_host_queue_entries(self, **data):
124 entries = self.run('get_host_queue_entries', **data)
125 return [job_status(self, e) for e in entries]
126
127
128 def create_job_by_test(self, tests, kernel=None, **dargs):
129 """
130 Given a test name, fetch the appropriate control file from the server
131 and submit it
132 """
133 results = self.run('generate_control_file', tests=tests, kernel=kernel,
134 use_container=False, do_push_packages=True)
135 if results['is_server']:
136 dargs['control_type'] = 'Server'
137 else:
138 dargs['control_type'] = 'Client'
139 dargs['dependencies'] = dargs.get('dependencies', []) + \
140 results['dependencies']
141 dargs['control_file'] = results['control_file']
142 dargs['synch_count'] = results['synch_count']
143 return self.create_job(**dargs)
144
145
146 def create_job(self, control_file, name=' ', priority='Medium',
147 control_type='Client', **dargs):
148 id = self.run('create_job', name=name, priority=priority,
149 control_file=control_file, control_type=control_type, **dargs)
150 return self.get_jobs(id=id)[0]
151
152
mbligh5b618382008-12-03 15:24:01 +0000153 def run_test_suites(self, pairings, kernel, kernel_label, wait=True,
154 poll_interval=5):
155 """
156 Run a list of test suites on a particular kernel.
157
158 Poll for them to complete, and return whether they worked or not.
159
160 pairings: list of MachineTestPairing objects to invoke
161 kernel: name of the kernel to run
162 kernel_label: label of the kernel to run
163 (<kernel-version> : <config> : <date>)
164 wait: boolean - wait for the results to come back?
165 poll_interval: interval between polling for job results (in minutes)
166 """
167 jobs = []
168 for pairing in pairings:
169 jobs.append(self.invoke_test(pairing, kernel, kernel_label))
170 if not wait:
171 return
172 while True:
173 time.sleep(60 * poll_interval)
174 result = self.poll_all_jobs(jobs)
175 if result is not None:
176 return result
177
178
179 def poll_all_jobs(self, jobs):
180 """
181 Poll all jobs in a list. See whether they are:
182 a) All complete successfully (return True)
183 b) One or more has failed (return False)
184 c) Cannot tell yet (return None)
185 """
186 for job in jobs:
187 result = self.poll_job_results(job.id, debug=False)
188 if result == True:
189 print 'PASSED %s : %s' % (job.id, job.name)
190 elif result == False:
191 print 'FAILED %s : %s' % (job.id, job.name)
192 return False
193 elif result is None:
194 print 'PENDING %s : %s' % (job.id, job.name)
195 return None
196 return True
197
198
199 def invoke_test(self, pairing, kernel, kernel_label, priority='Medium'):
200 """
201 Given a pairing of a control file to a machine label, find all machines
202 with that label, and submit that control file to them.
203
204 Returns a job object
205 """
206 job_name = '%s : %s' % (pairing.machine_label, kernel_label)
207 hosts = self.get_hosts(multiple_labels=[pairing.machine_label])
208 host_list = [host.hostname for host in hosts]
209 new_job = self.create_job_by_test(name=job_name,
210 dependencies=[pairing.machine_label],
211 tests=[pairing.control_file],
212 priority=priority,
213 hosts=host_list,
214 kernel=kernel)
215 print 'Invoked test %s : %s' % (new_job.id, job_name)
216 return new_job
217
218
219 def poll_job_results(self, job_id, debug=False):
220 """
221 Analyse all job results by platform, return:
222
223 False: if any platform has more than one failure
224 None: if any platform has more than one machine not yet Good.
225 True: if all platforms have at least all-but-one machines Good.
226 """
227 try:
228 job_statuses = self.get_host_queue_entries(job=job_id)
229 except Exception:
230 print "Ignoring exception on poll job; RPC interface is flaky"
231 traceback.print_exc()
232 return None
233
234 platform_map = {}
235 for job_status in job_statuses:
236 hostname = job_status.host.hostname
237 status = job_status.status
238 platform = job_status.host.platform
239 if platform not in platform_map:
240 platform_map[platform] = {'Total' : [hostname]}
241 else:
242 platform_map[platform]['Total'].append(hostname)
243 new_host_list = platform_map[platform].get(status, []) + [hostname]
244 platform_map[platform][status] = new_host_list
245
246 good_platforms = []
247 bad_platforms = []
248 unknown_platforms = []
249 for platform in platform_map:
250 total = len(platform_map[platform]['Total'])
251 completed = len(platform_map[platform].get('Completed', []))
252 failed = len(platform_map[platform].get('Failed', []))
253 if failed > 1:
254 bad_platforms.append(platform)
255 elif completed + 1 >= total:
256 # if all or all but one are good, call the job good.
257 good_platforms.append(platform)
258 else:
259 unknown_platforms.append(platform)
260 detail = []
261 for status in platform_map[platform]:
262 if status == 'Total':
263 continue
264 detail.append('%s=%s' % (status,platform_map[platform][status]))
265 if debug:
266 print '%20s %d/%d %s' % (platform, completed, total,
267 ' '.join(detail))
268 print
269
270 if len(bad_platforms) > 0:
271 if debug:
272 print 'Result bad - platforms: ' + ' '.join(bad_platforms)
273 return False
274 if len(unknown_platforms) > 0:
275 if debug:
276 platform_list = ' '.join(unknown_platforms)
277 print 'Result unknown - platforms: ', platform_list
278 return None
279 if debug:
280 platform_list = ' '.join(good_platforms)
281 print 'Result good - all platforms passed: ', platform_list
282 return True
283
284
mbligh67647152008-11-19 00:18:14 +0000285class rpc_object(object):
286 """
287 Generic object used to construct python objects from rpc calls
288 """
289 def __init__(self, afe, hash):
290 self.afe = afe
291 self.hash = hash
292 self.__dict__.update(hash)
293
294
295 def __str__(self):
296 return dump_object(self.__repr__(), self)
297
298
299class label(rpc_object):
300 """
301 AFE label object
302
303 Fields:
304 name, invalid, platform, kernel_config, id, only_if_needed
305 """
306 def __repr__(self):
307 return 'LABEL: %s' % self.name
308
309
310 def add_hosts(self, hosts):
311 return self.afe.run('label_add_hosts', self.id, hosts)
312
313
314 def remove_hosts(self, hosts):
315 return self.afe.run('label_remove_hosts', self.id, hosts)
316
317
318class acl(rpc_object):
319 """
320 AFE acl object
321
322 Fields:
323 users, hosts, description, name, id
324 """
325 def __repr__(self):
326 return 'ACL: %s' % self.name
327
328
329 def add_hosts(self, hosts):
330 self.afe.log('Adding hosts %s to ACL %s' % (hosts, self.name))
331 return self.afe.run('acl_group_add_hosts', self.id, hosts)
332
333
334 def remove_hosts(self, hosts):
335 self.afe.log('Removing hosts %s from ACL %s' % (hosts, self.name))
336 return self.afe.run('acl_group_remove_hosts', self.id, hosts)
337
338
339class job(rpc_object):
340 """
341 AFE job object
342
343 Fields:
344 name, control_file, control_type, synch_count, reboot_before,
345 run_verify, priority, email_list, created_on, dependencies,
346 timeout, owner, reboot_after, id
347 """
348 def __repr__(self):
349 return 'JOB: %s' % self.id
350
351
352class job_status(rpc_object):
353 """
354 AFE job_status object
355
356 Fields:
357 status, complete, deleted, meta_host, host, active, execution_subdir, id
358 """
359 def __init__(self, afe, hash):
360 # This should call super
361 self.afe = afe
362 self.hash = hash
363 self.__dict__.update(hash)
364 self.job = job(afe, self.job)
365 if self.host:
366 self.host = afe.get_hosts(hostname=self.host['hostname'])[0]
367
368
369 def __repr__(self):
370 return 'JOB STATUS: %s-%s' % (self.job.id, self.host.hostname)
371
372
373class host(rpc_object):
374 """
375 AFE host object
376
377 Fields:
378 status, lock_time, locked_by, locked, hostname, invalid,
379 synch_id, labels, platform, protection, dirty, id
380 """
381 def __repr__(self):
382 return 'HOST OBJECT: %s' % self.hostname
383
384
385 def show(self):
386 labels = list(set(self.labels) - set([self.platform]))
387 print '%-6s %-7s %-7s %-16s %s' % (self.hostname, self.status,
388 self.locked, self.platform,
389 ', '.join(labels))
390
391
392 def get_acls(self):
393 return self.afe.get_acls(hosts__hostname=self.hostname)
394
395
396 def add_acl(self, acl_name):
397 self.afe.log('Adding ACL %s to host %s' % (acl_name, self.hostname))
398 return self.afe.run('acl_group_add_hosts', id=acl_name,
399 hosts=[self.hostname])
400
401
402 def remove_acl(self, acl_name):
403 self.afe.log('Removing ACL %s from host %s' % (acl_name, self.hostname))
404 return self.afe.run('acl_group_remove_hosts', id=acl_name,
405 hosts=[self.hostname])
406
407
408 def get_labels(self):
409 return self.afe.get_labels(host__hostname__in=[self.hostname])
410
411
412 def add_labels(self, labels):
413 self.afe.log('Adding labels %s to host %s' % (labels, self.hostname))
414 return self.afe.run('host_add_labels', id=self.id, labels=labels)
415
416
417 def remove_labels(self, labels):
418 self.afe.log('Removing labels %s from host %s' % (labels,self.hostname))
419 return self.afe.run('host_remove_labels', id=self.id, labels=labels)
mbligh5b618382008-12-03 15:24:01 +0000420
421
422class MachineTestPairing(object):
423 """
424 Object representing the pairing of a machine label with a control file
425 """
426 def __init__(self, machine_label, control_file):
427 self.machine_label = machine_label
428 self.control_file = control_file