blob: 8ebea20446a7b70d7fc81bb67d772c134c0a3fef [file] [log] [blame]
maruel@chromium.org0437a732013-08-27 16:05:52 +00001#!/usr/bin/env python
2# Copyright 2013 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Client tool to trigger tasks or retrieve results from a Swarming server."""
7
8__version__ = '0.1'
9
10import hashlib
11import json
12import logging
13import os
14import re
15import shutil
16import StringIO
17import subprocess
18import sys
19import time
20import urllib
21import zipfile
22
23from third_party import colorama
24from third_party.depot_tools import fix_encoding
25from third_party.depot_tools import subcommand
26from utils import tools
27from utils import threading_utils
28
29import run_isolated
30
31
32ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
33TOOLS_PATH = os.path.join(ROOT_DIR, 'tools')
34
35
36# Default servers.
37# TODO(maruel): Chromium-specific.
38ISOLATE_SERVER = 'https://isolateserver-dev.appspot.com/'
39SWARM_SERVER = 'https://chromium-swarm-dev.appspot.com'
40
41
42# The default time to wait for a shard to finish running.
43DEFAULT_SHARD_WAIT_TIME = 40 * 60.
44
45
46NO_OUTPUT_FOUND = (
47 'No output produced by the test, it may have failed to run.\n'
48 '\n')
49
50
51PLATFORM_MAPPING = {
52 'cygwin': 'Windows',
53 'darwin': 'Mac',
54 'linux2': 'Linux',
55 'win32': 'Windows',
56}
57
58
59class Failure(Exception):
60 """Generic failure."""
61 pass
62
63
64class Manifest(object):
65 """Represents a Swarming task manifest.
66
67 Also includes code to zip code and upload itself.
68 """
69 def __init__(
70 self, manifest_hash, test_name, shards, test_filter, slave_os,
71 working_dir, isolate_server, verbose, profile, priority):
72 """Populates a manifest object.
73 Args:
74 manifest_hash - The manifest's sha-1 that the slave is going to fetch.
75 test_name - The name to give the test request.
76 shards - The number of swarm shards to request.
77 test_filter - The gtest filter to apply when running the test.
78 slave_os - OS to run on.
79 working_dir - Relative working directory to start the script.
80 isolate_server - isolate server url.
81 verbose - if True, have the slave print more details.
82 profile - if True, have the slave print more timing data.
83 priority - int between 0 and 1000, lower the higher priority
84 """
85 self.manifest_hash = manifest_hash
86 self._test_name = test_name
87 self._shards = shards
88 self._test_filter = test_filter
89 self._target_platform = PLATFORM_MAPPING[slave_os]
90 self._working_dir = working_dir
91
92 self.data_server_retrieval = isolate_server + '/content/retrieve/default/'
93 self._data_server_storage = isolate_server + '/content/store/default/'
94 self._data_server_has = isolate_server + '/content/contains/default'
95 self._data_server_get_token = isolate_server + '/content/get_token'
96
97 self.verbose = bool(verbose)
98 self.profile = bool(profile)
99 self.priority = priority
100
101 self._zip_file_hash = ''
102 self._tasks = []
103 self._files = {}
104 self._token_cache = None
105
106 def _token(self):
107 if not self._token_cache:
108 result = run_isolated.url_open(self._data_server_get_token)
109 if not result:
110 # TODO(maruel): Implement authentication.
111 raise Failure('Failed to get token, need authentication')
112 # Quote it right away, so creating the urls is simpler.
113 self._token_cache = urllib.quote(result.read())
114 return self._token_cache
115
116 def add_task(self, task_name, actions, time_out=600):
117 """Appends a new task to the swarm manifest file."""
118 # See swarming/src/common/test_request_message.py TestObject constructor for
119 # the valid flags.
120 self._tasks.append(
121 {
122 'action': actions,
123 'decorate_output': self.verbose,
124 'test_name': task_name,
125 'time_out': time_out,
126 })
127
128 def add_file(self, source_path, rel_path):
129 self._files[source_path] = rel_path
130
131 def zip_and_upload(self):
132 """Zips up all the files necessary to run a shard and uploads to Swarming
133 master.
134 """
135 assert not self._zip_file_hash
136 start_time = time.time()
137
138 zip_memory_file = StringIO.StringIO()
139 zip_file = zipfile.ZipFile(zip_memory_file, 'w')
140
141 for source, relpath in self._files.iteritems():
142 zip_file.write(source, relpath)
143
144 zip_file.close()
145 print 'Zipping completed, time elapsed: %f' % (time.time() - start_time)
146
147 zip_memory_file.flush()
148 zip_contents = zip_memory_file.getvalue()
149 zip_memory_file.close()
150
151 self._zip_file_hash = hashlib.sha1(zip_contents).hexdigest()
152
153 response = run_isolated.url_open(
154 self._data_server_has + '?token=%s' % self._token(),
155 data=self._zip_file_hash,
156 content_type='application/octet-stream')
157 if response is None:
158 print >> sys.stderr, (
159 'Unable to query server for zip file presence, aborting.')
160 return False
161
162 if response.read(1) == chr(1):
163 print 'Zip file already on server, no need to reupload.'
164 return True
165
166 print 'Zip file not on server, starting uploading.'
167
168 url = '%s%s?priority=0&token=%s' % (
169 self._data_server_storage, self._zip_file_hash, self._token())
170 response = run_isolated.url_open(
171 url, data=zip_contents, content_type='application/octet-stream')
172 if response is None:
173 print >> sys.stderr, 'Failed to upload the zip file: %s' % url
174 return False
175
176 return True
177
178 def to_json(self):
179 """Exports the current configuration into a swarm-readable manifest file.
180
181 This function doesn't mutate the object.
182 """
183 test_case = {
184 'test_case_name': self._test_name,
185 'data': [
186 [self.data_server_retrieval + urllib.quote(self._zip_file_hash),
187 'swarm_data.zip'],
188 ],
189 'tests': self._tasks,
190 'env_vars': {},
191 'configurations': [
192 {
193 'min_instances': self._shards,
194 'config_name': self._target_platform,
195 'dimensions': {
196 'os': self._target_platform,
197 },
198 },
199 ],
200 'working_dir': self._working_dir,
201 'restart_on_failure': True,
202 'cleanup': 'root',
203 'priority': self.priority,
204 }
205
206 # These flags are googletest specific.
207 if self._test_filter and self._test_filter != '*':
208 test_case['env_vars']['GTEST_FILTER'] = self._test_filter
209 if self._shards > 1:
210 test_case['env_vars']['GTEST_SHARD_INDEX'] = '%(instance_index)s'
211 test_case['env_vars']['GTEST_TOTAL_SHARDS'] = '%(num_instances)s'
212
213 return json.dumps(test_case, separators=(',',':'))
214
215
216def now():
217 """Exists so it can be mocked easily."""
218 return time.time()
219
220
221def get_test_keys(swarm_base_url, test_name):
222 """Returns the Swarm test key for each shards of test_name."""
223 key_data = urllib.urlencode([('name', test_name)])
224 url = '%s/get_matching_test_cases?%s' % (swarm_base_url, key_data)
225
226 for i in range(run_isolated.URL_OPEN_MAX_ATTEMPTS):
227 response = run_isolated.url_open(url, retry_404=True)
228 if response is None:
229 raise Failure(
230 'Error: Unable to find any tests with the name, %s, on swarm server'
231 % test_name)
232
233 result = response.read()
234 # TODO(maruel): Compare exact string.
235 if 'No matching' in result:
236 logging.warning('Unable to find any tests with the name, %s, on swarm '
237 'server' % test_name)
238 if i != run_isolated.URL_OPEN_MAX_ATTEMPTS:
239 run_isolated.HttpService.sleep_before_retry(i, None)
240 continue
241 return json.loads(result)
242
243 raise Failure(
244 'Error: Unable to find any tests with the name, %s, on swarm server'
245 % test_name)
246
247
248def retrieve_results(base_url, test_key, timeout, should_stop):
249 """Retrieves results for a single test_key."""
250 assert isinstance(timeout, float)
251 params = [('r', test_key)]
252 result_url = '%s/get_result?%s' % (base_url, urllib.urlencode(params))
253 start = now()
254 while True:
255 if timeout and (now() - start) >= timeout:
256 logging.error('retrieve_results(%s) timed out', base_url)
257 return {}
258 # Do retries ourselves.
259 response = run_isolated.url_open(
260 result_url, retry_404=False, retry_50x=False)
261 if response is None:
262 # Aggressively poll for results. Do not use retry_404 so
263 # should_stop is polled more often.
264 remaining = min(5, timeout - (now() - start)) if timeout else 5
265 if remaining > 0:
266 run_isolated.HttpService.sleep_before_retry(1, remaining)
267 else:
268 try:
269 data = json.load(response) or {}
270 except (ValueError, TypeError):
271 logging.warning(
272 'Received corrupted data for test_key %s. Retrying.', test_key)
273 else:
274 if data['output']:
275 return data
276 if should_stop.get():
277 return {}
278
279
280def yield_results(swarm_base_url, test_keys, timeout, max_threads):
281 """Yields swarm test results from the swarm server as (index, result).
282
283 Duplicate shards are ignored, the first one to complete is returned.
284
285 max_threads is optional and is used to limit the number of parallel fetches
286 done. Since in general the number of test_keys is in the range <=10, it's not
287 worth normally to limit the number threads. Mostly used for testing purposes.
288 """
289 shards_remaining = range(len(test_keys))
290 number_threads = (
291 min(max_threads, len(test_keys)) if max_threads else len(test_keys))
292 should_stop = threading_utils.Bit()
293 results_remaining = len(test_keys)
294 with threading_utils.ThreadPool(number_threads, number_threads, 0) as pool:
295 try:
296 for test_key in test_keys:
297 pool.add_task(
298 0, retrieve_results, swarm_base_url, test_key, timeout, should_stop)
299 while shards_remaining and results_remaining:
300 result = pool.get_one_result()
301 results_remaining -= 1
302 if not result:
303 # Failed to retrieve one key.
304 logging.error('Failed to retrieve the results for a swarm key')
305 continue
306 shard_index = result['config_instance_index']
307 if shard_index in shards_remaining:
308 shards_remaining.remove(shard_index)
309 yield shard_index, result
310 else:
311 logging.warning('Ignoring duplicate shard index %d', shard_index)
312 # Pop the last entry, there's no such shard.
313 shards_remaining.pop()
314 finally:
315 # Done, kill the remaining threads.
316 should_stop.set()
317
318
319def chromium_setup(manifest):
320 """Sets up the commands to run.
321
322 Highly chromium specific.
323 """
324 cleanup_script_name = 'swarm_cleanup.py'
325 cleanup_script_path = os.path.join(TOOLS_PATH, cleanup_script_name)
326 run_test_name = 'run_isolated.py'
327 run_test_path = os.path.join(ROOT_DIR, run_test_name)
328
329 manifest.add_file(run_test_path, run_test_name)
330 manifest.add_file(cleanup_script_path, cleanup_script_name)
331 run_cmd = [
332 'python', run_test_name,
333 '--hash', manifest.manifest_hash,
334 '--remote', manifest.data_server_retrieval.rstrip('/') + '-gzip/',
335 ]
336 if manifest.verbose or manifest.profile:
337 # Have it print the profiling section.
338 run_cmd.append('--verbose')
339 manifest.add_task('Run Test', run_cmd)
340
341 # Clean up
342 manifest.add_task('Clean Up', ['python', cleanup_script_name])
343
344
345def archive(isolated, isolate_server, verbose):
346 """Archives a .isolated and all the dependencies on the CAC."""
347 tempdir = None
348 try:
349 logging.info('Archiving')
350 cmd = [
351 sys.executable,
352 os.path.join(ROOT_DIR, 'isolate.py'),
353 'hashtable',
354 '--outdir', isolate_server,
355 '--isolated', isolated,
356 ]
357 if verbose:
358 cmd.append('--verbose')
359 logging.info(' '.join(cmd))
360 if subprocess.call(cmd, verbose):
361 return
362 return hashlib.sha1(open(isolated, 'rb').read()).hexdigest()
363 finally:
364 if tempdir:
365 shutil.rmtree(tempdir)
366
367
368def process_manifest(
369 file_sha1_or_isolated, test_name, shards, test_filter, slave_os,
370 working_dir, isolate_server, swarming, verbose, profile, priority):
371 """Process the manifest file and send off the swarm test request.
372
373 Optionally archives an .isolated file.
374 """
375 if file_sha1_or_isolated.endswith('.isolated'):
376 file_sha1 = archive(file_sha1_or_isolated, isolate_server, verbose)
377 if not file_sha1:
378 print >> sys.stderr, 'Archival failure %s' % file_sha1_or_isolated
379 return 1
380 elif re.match(r'^[a-f0-9]{40}$', file_sha1_or_isolated):
381 file_sha1 = file_sha1_or_isolated
382 else:
383 print >> sys.stderr, 'Invalid hash %s' % file_sha1_or_isolated
384 return 1
385
386 try:
387 manifest = Manifest(
388 file_sha1, test_name, shards, test_filter, slave_os,
389 working_dir, isolate_server, verbose, profile, priority)
390 except ValueError as e:
391 print >> sys.stderr, 'Unable to process %s: %s' % (test_name, e)
392 return 1
393
394 chromium_setup(manifest)
395
396 # Zip up relevant files.
397 print('Zipping up files...')
398 if not manifest.zip_and_upload():
399 return 1
400
401 # Send test requests off to swarm.
402 print('Sending test requests to swarm.')
403 print('Server: %s' % swarming)
404 print('Job name: %s' % test_name)
405 test_url = swarming + '/test'
406 manifest_text = manifest.to_json()
407 result = run_isolated.url_open(test_url, data={'request': manifest_text})
408 if not result:
409 print >> sys.stderr, 'Failed to send test for %s\n%s' % (
410 test_name, test_url)
411 return 1
412 try:
413 json.load(result)
414 except (ValueError, TypeError) as e:
415 print >> sys.stderr, 'Failed to send test for %s' % test_name
416 print >> sys.stderr, 'Manifest: %s' % manifest_text
417 print >> sys.stderr, str(e)
418 return 1
419 return 0
420
421
422def trigger(
423 slave_os,
424 tasks,
425 task_prefix,
426 working_dir,
427 isolate_server,
428 swarming,
429 verbose,
430 profile,
431 priority):
432 """Sends off the hash swarming test requests."""
433 highest_exit_code = 0
434 for (file_sha1, test_name, shards, testfilter) in tasks:
435 # TODO(maruel): It should first create a request manifest object, then pass
436 # it to a function to zip, archive and trigger.
437 exit_code = process_manifest(
438 file_sha1,
439 task_prefix + test_name,
440 int(shards),
441 testfilter,
442 slave_os,
443 working_dir,
444 isolate_server,
445 swarming,
446 verbose,
447 profile,
448 priority)
449 highest_exit_code = max(highest_exit_code, exit_code)
450 return highest_exit_code
451
452
453def decorate_shard_output(result, shard_exit_code):
454 """Returns wrapped output for swarming task shard."""
455 tag = 'index %s (machine tag: %s, id: %s)' % (
456 result['config_instance_index'],
457 result['machine_id'],
458 result.get('machine_tag', 'unknown'))
459 return (
460 '\n'
461 '================================================================\n'
462 'Begin output from shard %s\n'
463 '================================================================\n'
464 '\n'
465 '%s'
466 '================================================================\n'
467 'End output from shard %s. Return %d\n'
468 '================================================================\n'
469 ) % (tag, result['output'] or NO_OUTPUT_FOUND, tag, shard_exit_code)
470
471
472def collect(url, test_name, timeout, decorate):
473 """Retrieves results of a Swarming job."""
474 test_keys = get_test_keys(url, test_name)
475 if not test_keys:
476 raise Failure('No test keys to get results with.')
477
478 exit_code = 0
479 for _index, output in yield_results(url, test_keys, timeout, None):
480 shard_exit_codes = (output['exit_codes'] or '1').split(',')
481 shard_exit_code = max(int(i) for i in shard_exit_codes)
482 if decorate:
483 print decorate_shard_output(output, shard_exit_code)
484 else:
485 print(
486 '%s/%s: %s' % (
487 output['machine_id'],
488 output['machine_tag'],
489 output['exit_codes']))
490 print(''.join(' %s\n' % l for l in output['output'].splitlines()))
491 return exit_code
492
493
494def add_trigger_options(parser):
495 """Adds all options to trigger a task on Swarming."""
496 parser.add_option(
497 '-I', '--isolate-server',
498 default=ISOLATE_SERVER,
499 metavar='URL',
500 help='Isolate server where data is stored. default: %default')
501 parser.add_option(
502 '-w', '--working_dir', default='swarm_tests',
503 help='Working directory on the swarm slave side. default: %default.')
504 parser.add_option(
505 '-o', '--os', default=sys.platform,
506 help='Swarm OS image to request. Should be one of the valid sys.platform '
507 'values like darwin, linux2 or win32 default: %default.')
508 parser.add_option(
509 '-T', '--task-prefix', default='',
510 help='Prefix to give the swarm test request. default: %default')
511 parser.add_option(
512 '--profile', action='store_true',
513 default=bool(os.environ.get('ISOLATE_DEBUG')),
514 help='Have run_isolated.py print profiling info')
515 parser.add_option(
516 '--priority', type='int', default=100,
517 help='The lower value, the more important the task is')
518
519
520def process_trigger_options(parser, options):
521 options.isolate_server = options.isolate_server.rstrip('/')
522 if not options.isolate_server:
523 parser.error('--isolate-server is required.')
524 if options.os in ('', 'None'):
525 # Use the current OS.
526 options.os = sys.platform
527 if not options.os in PLATFORM_MAPPING:
528 parser.error('Invalid --os option.')
529
530
531def add_collect_options(parser):
532 parser.add_option(
533 '-t', '--timeout',
534 type='float',
535 default=DEFAULT_SHARD_WAIT_TIME,
536 help='Timeout to wait for result, set to 0 for no timeout; default: '
537 '%default s')
538 parser.add_option('--decorate', action='store_true', help='Decorate output')
539
540
541@subcommand.usage('test_name')
542def CMDcollect(parser, args):
543 """Retrieves results of a Swarming job.
544
545 The result can be in multiple part if the execution was sharded. It can
546 potentially have retries.
547 """
548 add_collect_options(parser)
549 (options, args) = parser.parse_args(args)
550 if not args:
551 parser.error('Must specify one test name.')
552 elif len(args) > 1:
553 parser.error('Must specify only one test name.')
554
555 try:
556 return collect(options.swarming, args[0], options.timeout, options.decorate)
557 except Failure as e:
558 parser.error(e.args[0])
559
560
561@subcommand.usage('[sha1|isolated ...]')
562def CMDrun(parser, args):
563 """Triggers a job and wait for the results.
564
565 Basically, does everything to run command(s) remotely.
566 """
567 add_trigger_options(parser)
568 add_collect_options(parser)
569 options, args = parser.parse_args(args)
570
571 if not args:
572 parser.error('Must pass at least one .isolated file or its sha1.')
573 process_trigger_options(parser, options)
574
575 success = []
576 for arg in args:
577 logging.info('Triggering %s', arg)
578 try:
579 result = trigger(
580 options.os,
581 [(arg, os.path.basename(arg), '1', '')],
582 options.task_prefix,
583 options.working_dir,
584 options.isolate_server,
585 options.swarming,
586 options.verbose,
587 options.profile,
588 options.priority)
589 except Failure as e:
590 result = e.args[0]
591 if result:
592 print >> sys.stderr, 'Failed to trigger %s: %s' % (arg, result)
593 else:
594 success.append(os.path.basename(arg))
595
596 if not success:
597 print >> sys.stderr, 'Failed to trigger any job.'
598 return result
599
600 code = 0
601 for arg in success:
602 logging.info('Collecting %s', arg)
603 try:
604 new_code = collect(
605 options.swarming,
606 options.task_prefix + arg,
607 options.timeout,
608 options.decorate)
609 code = max(code, new_code)
610 except Failure as e:
611 code = max(code, 1)
612 print >> sys.stderr, e.args[0]
613 return code
614
615
616def CMDtrigger(parser, args):
617 """Triggers Swarm request(s).
618
619 Accepts one or multiple --task requests, with either the sha1 of a .isolated
620 file already uploaded or the path to an .isolated file to archive, packages it
621 if needed and sends a Swarm manifest file to the Swarm server.
622 """
623 add_trigger_options(parser)
624 parser.add_option(
625 '--task', nargs=4, action='append', default=[], dest='tasks',
626 help='Task to trigger. The format is '
627 '(hash|isolated, test_name, shards, test_filter). This may be '
628 'used multiple times to send multiple hashes jobs. If an isolated '
629 'file is specified instead of an hash, it is first archived.')
630 (options, args) = parser.parse_args(args)
631
632 if args:
633 parser.error('Unknown args: %s' % args)
634 process_trigger_options(parser, options)
635 if not options.tasks:
636 parser.error('At least one --task is required.')
637
638 try:
639 return trigger(
640 options.os,
641 options.tasks,
642 options.task_prefix,
643 options.working_dir,
644 options.isolate_server,
645 options.swarming,
646 options.verbose,
647 options.profile,
648 options.priority)
649 except Failure as e:
650 parser.error(e.args[0])
651
652
653class OptionParserSwarming(tools.OptionParserWithLogging):
654 def __init__(self, **kwargs):
655 tools.OptionParserWithLogging.__init__(
656 self, prog='swarming.py', **kwargs)
657 self.add_option(
658 '-S', '--swarming', default=SWARM_SERVER,
659 help='Specify the url of the Swarming server, default: %default')
660
661 def parse_args(self, *args, **kwargs):
662 options, args = tools.OptionParserWithLogging.parse_args(
663 self, *args, **kwargs)
664 options.swarming = options.swarming.rstrip('/')
665 if not options.swarming:
666 self.error('--swarming is required.')
667 return options, args
668
669
670def main(args):
671 dispatcher = subcommand.CommandDispatcher(__name__)
672 try:
673 return dispatcher.execute(OptionParserSwarming(version=__version__), args)
674 except (
675 Failure,
676 run_isolated.MappingError,
677 run_isolated.ConfigError) as e:
678 sys.stderr.write('\nError: ')
679 sys.stderr.write(str(e))
680 sys.stderr.write('\n')
681 return 1
682
683
684if __name__ == '__main__':
685 fix_encoding.fix_encoding()
686 tools.disable_buffering()
687 colorama.init()
688 sys.exit(main(sys.argv[1:]))