blob: 24da5bcd6a6cbe708300c7f692df5c676c8445f9 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
2# Copyright (c) 2012 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Archives a set of files to a server."""
7
8import binascii
9import hashlib
10import logging
11import optparse
12import os
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000013import cStringIO
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import sys
15import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000016import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000017import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018
19import run_isolated
csharp@chromium.org07fa7592013-01-11 18:19:30 +000020import run_test_cases
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021
22
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023# The minimum size of files to upload directly to the blobstore.
maruel@chromium.orgaef29f82012-12-12 15:00:42 +000024MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
csharp@chromium.org07fa7592013-01-11 18:19:30 +000026# The number of files to check the isolate server for each query.
csharp@chromium.org20a888c2013-01-15 15:06:55 +000027ITEMS_PER_CONTAINS_QUERY = 500
csharp@chromium.org07fa7592013-01-11 18:19:30 +000028
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000029# A list of already compressed extension types that should not receive any
30# compression before being uploaded.
31ALREADY_COMPRESSED_TYPES = [
32 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
33 'wav', 'zip'
34]
35
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000036
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +000037def randomness():
38 """Generates low-entropy randomness for MIME encoding.
39
40 Exists so it can be mocked out in unit tests.
41 """
42 return str(time.time())
43
44
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045def encode_multipart_formdata(fields, files,
46 mime_mapper=lambda _: 'application/octet-stream'):
47 """Encodes a Multipart form data object.
48
49 Args:
50 fields: a sequence (name, value) elements for
51 regular form fields.
52 files: a sequence of (name, filename, value) elements for data to be
53 uploaded as files.
54 mime_mapper: function to return the mime type from the filename.
55 Returns:
56 content_type: for httplib.HTTP instance
57 body: for httplib.HTTP instance
58 """
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +000059 boundary = hashlib.md5(randomness()).hexdigest()
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000060 body_list = []
61 for (key, value) in fields:
62 if isinstance(key, unicode):
63 value = key.encode('utf-8')
64 if isinstance(value, unicode):
65 value = value.encode('utf-8')
66 body_list.append('--' + boundary)
67 body_list.append('Content-Disposition: form-data; name="%s"' % key)
68 body_list.append('')
69 body_list.append(value)
70 body_list.append('--' + boundary)
71 body_list.append('')
72 for (key, filename, value) in files:
73 if isinstance(key, unicode):
74 value = key.encode('utf-8')
75 if isinstance(filename, unicode):
76 value = filename.encode('utf-8')
77 if isinstance(value, unicode):
78 value = value.encode('utf-8')
79 body_list.append('--' + boundary)
80 body_list.append('Content-Disposition: form-data; name="%s"; '
81 'filename="%s"' % (key, filename))
82 body_list.append('Content-Type: %s' % mime_mapper(filename))
83 body_list.append('')
84 body_list.append(value)
85 body_list.append('--' + boundary)
86 body_list.append('')
87 if body_list:
88 body_list[-2] += '--'
89 body = '\r\n'.join(body_list)
90 content_type = 'multipart/form-data; boundary=%s' % boundary
91 return content_type, body
92
93
maruel@chromium.org037758d2012-12-10 17:59:46 +000094def sha1_file(filepath):
95 """Calculates the SHA-1 of a file without reading it all in memory at once."""
96 digest = hashlib.sha1()
97 with open(filepath, 'rb') as f:
98 while True:
99 # Read in 1mb chunks.
100 chunk = f.read(1024*1024)
101 if not chunk:
102 break
103 digest.update(chunk)
104 return digest.hexdigest()
105
106
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000107def url_open(url, **kwargs):
108 result = run_isolated.url_open(url, **kwargs)
maruel@chromium.orgef333122013-03-12 20:36:40 +0000109 if not result:
110 # If we get no response from the server, assume it is down and raise an
111 # exception.
112 raise run_isolated.MappingError('Unable to connect to server %s' % url)
113 return result
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000114
115
maruel@chromium.orgdc359e62013-03-14 13:08:55 +0000116def upload_hash_content_to_blobstore(
117 generate_upload_url, data, hash_key, content):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000118 """Uploads the given hash contents directly to the blobsotre via a generated
119 url.
120
121 Arguments:
122 generate_upload_url: The url to get the new upload url from.
maruel@chromium.orgdc359e62013-03-14 13:08:55 +0000123 data: extra POST data.
124 hash_key: sha1 of the uncompressed version of content.
125 content: The contents to upload. Must fit in memory for now.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000126 """
127 logging.debug('Generating url to directly upload file to blobstore')
maruel@chromium.org92a3d2e2012-12-20 16:22:29 +0000128 assert isinstance(hash_key, str), hash_key
129 assert isinstance(content, str), (hash_key, content)
maruel@chromium.orgd58bf5b2013-04-26 17:57:42 +0000130 # TODO(maruel): Support large files. This would require streaming support.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000131 content_type, body = encode_multipart_formdata(
maruel@chromium.orgd58bf5b2013-04-26 17:57:42 +0000132 data, [('content', hash_key, content)])
133 for _ in range(run_isolated.MAX_URL_OPEN_ATTEMPTS):
134 # Retry HTTP 50x here.
135 response = run_isolated.url_open(generate_upload_url, data=data)
136 if not response:
137 raise run_isolated.MappingError(
138 'Unable to connect to server %s' % generate_upload_url)
139 upload_url = response.read()
140
141 # Do not retry this request on HTTP 50x. Regenerate an upload url each time
142 # since uploading "consumes" the upload url.
143 result = run_isolated.url_open(
144 upload_url, data=body, content_type=content_type, retry_50x=False)
145 if result:
146 return result.read()
147 raise run_isolated.MappingError(
148 'Unable to connect to server %s' % generate_upload_url)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000149
150
151class UploadRemote(run_isolated.Remote):
maruel@chromium.org034e3962013-03-13 13:34:25 +0000152 def __init__(self, namespace, base_url, token):
maruel@chromium.org21243ce2012-12-20 17:43:00 +0000153 self.namespace = str(namespace)
maruel@chromium.org034e3962013-03-13 13:34:25 +0000154 self._token = token
155 super(UploadRemote, self).__init__(base_url)
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000156
157 def get_file_handler(self, base_url):
maruel@chromium.org21243ce2012-12-20 17:43:00 +0000158 base_url = str(base_url)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000159 def upload_file(content, hash_key):
maruel@chromium.org034e3962013-03-13 13:34:25 +0000160 # TODO(maruel): Detect failures.
maruel@chromium.org21243ce2012-12-20 17:43:00 +0000161 hash_key = str(hash_key)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000162 content_url = base_url.rstrip('/') + '/content/'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000163 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE:
maruel@chromium.orgdc359e62013-03-14 13:08:55 +0000164 url = '%sgenerate_blobstore_url/%s/%s' % (
165 content_url, self.namespace, hash_key)
maruel@chromium.orge82112e2013-04-24 14:41:55 +0000166 # self._token is stored already quoted but it is unnecessary here, and
167 # only here.
168 data = [('token', urllib.unquote(self._token))]
maruel@chromium.orgdc359e62013-03-14 13:08:55 +0000169 upload_hash_content_to_blobstore(url, data, hash_key, content)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000170 else:
maruel@chromium.org034e3962013-03-13 13:34:25 +0000171 url = '%sstore/%s/%s?token=%s' % (
172 content_url, self.namespace, hash_key, self._token)
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000173 url_open(url, data=content, content_type='application/octet-stream')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000174 return upload_file
175
176
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000177def update_files_to_upload(query_url, queries, upload):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000178 """Queries the server to see which files from this batch already exist there.
179
180 Arguments:
181 queries: The hash files to potential upload to the server.
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000182 upload: Any new files that need to be upload are sent to this function.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000183 """
184 body = ''.join(
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000185 (binascii.unhexlify(meta_data['h']) for (_, meta_data) in queries))
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000186 assert (len(body) % 20) == 0, repr(body)
187
maruel@chromium.orgef333122013-03-12 20:36:40 +0000188 response = url_open(
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000189 query_url, data=body, content_type='application/octet-stream').read()
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000190 if len(queries) != len(response):
191 raise run_isolated.MappingError(
192 'Got an incorrect number of responses from the server. Expected %d, '
193 'but got %d' % (len(queries), len(response)))
194
195 hit = 0
196 for i in range(len(response)):
197 if response[i] == chr(0):
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000198 upload(queries[i])
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000199 else:
200 hit += 1
201 logging.info('Queried %d files, %d cache hit', len(queries), hit)
202
203
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000204def compression_level(filename):
205 """Given a filename calculates the ideal compression level to use."""
206 file_ext = os.path.splitext(filename)[1].lower()
207 # TODO(csharp): Profile to find what compression level works best.
208 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
209
210
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000211def read_and_compress(filepath, level):
212 """Reads a file and returns its content gzip compressed."""
213 compressor = zlib.compressobj(level)
214 compressed_data = cStringIO.StringIO()
215 with open(filepath, 'rb') as f:
216 while True:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000217 chunk = f.read(run_isolated.ZIPPED_FILE_CHUNK)
218 if not chunk:
219 break
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000220 compressed_data.write(compressor.compress(chunk))
221 compressed_data.write(compressor.flush(zlib.Z_FINISH))
222 value = compressed_data.getvalue()
223 compressed_data.close()
224 return value
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000225
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000226
227def zip_and_trigger_upload(infile, metadata, upload_function):
228 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
229 # if not metadata['T']:
230 compressed_data = read_and_compress(infile, compression_level(infile))
231 priority = (
232 run_isolated.Remote.HIGH if metadata.get('priority', '1') == '0'
233 else run_isolated.Remote.MED)
234 return upload_function(priority, compressed_data, metadata['h'], None)
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000235
236
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000237def process_items(contains_hash_url, infiles, zip_and_upload):
238 """Generates the list of files that need to be uploaded and send them to
239 zip_and_upload.
240
241 Some may already be on the server.
242 """
243 next_queries = []
csharp@chromium.org90c45812013-01-23 14:27:21 +0000244 items = ((k, v) for k, v in infiles.iteritems() if 's' in v)
245 for relfile, metadata in sorted(items, key=lambda x: -x[1]['s']):
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000246 next_queries.append((relfile, metadata))
247 if len(next_queries) == ITEMS_PER_CONTAINS_QUERY:
248 update_files_to_upload(contains_hash_url, next_queries, zip_and_upload)
249 next_queries = []
250 if next_queries:
251 update_files_to_upload(contains_hash_url, next_queries, zip_and_upload)
252
253
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000254def upload_sha1_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000255 """Uploads the given tree to the given url.
256
257 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000258 base_url: The base url, it is assume that |base_url|/has/ can be used to
259 query if an element was already uploaded, and |base_url|/store/
260 can be used to upload a new element.
261 indir: Root directory the infiles are based in.
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000262 infiles: dict of files to upload files from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000263 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000264 """
265 logging.info('upload tree(base_url=%s, indir=%s, files=%d)' %
266 (base_url, indir, len(infiles)))
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000267 assert base_url.startswith('http'), base_url
268 base_url = base_url.rstrip('/')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000269
maruel@chromium.org034e3962013-03-13 13:34:25 +0000270 # TODO(maruel): Make this request much earlier asynchronously while the files
271 # are being enumerated.
maruel@chromium.orge82112e2013-04-24 14:41:55 +0000272 token = urllib.quote(url_open(base_url + '/content/get_token').read())
maruel@chromium.org034e3962013-03-13 13:34:25 +0000273
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000274 # Create a pool of workers to zip and upload any files missing from
275 # the server.
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000276 num_threads = run_test_cases.num_processors()
277 zipping_pool = run_isolated.ThreadPool(num_threads, num_threads, 0)
maruel@chromium.org034e3962013-03-13 13:34:25 +0000278 remote_uploader = UploadRemote(namespace, base_url, token)
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000279
280 # Starts the zip and upload process for a given query. The query is assumed
281 # to be in the format (relfile, metadata).
csharp@chromium.org20a888c2013-01-15 15:06:55 +0000282 uploaded = []
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000283 def zip_and_upload(query):
284 relfile, metadata = query
285 infile = os.path.join(indir, relfile)
maruel@chromium.org831958f2013-01-22 15:01:46 +0000286 zipping_pool.add_task(0, zip_and_trigger_upload, infile, metadata,
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000287 remote_uploader.add_item)
csharp@chromium.org20a888c2013-01-15 15:06:55 +0000288 uploaded.append(query)
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000289
maruel@chromium.org034e3962013-03-13 13:34:25 +0000290 contains_hash_url = '%s/content/contains/%s?token=%s' % (
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000291 base_url, namespace, token)
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000292 process_items(contains_hash_url, infiles, zip_and_upload)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000293
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000294 logging.info('Waiting for all files to finish zipping')
295 zipping_pool.join()
296 logging.info('All files zipped.')
297
298 logging.info('Waiting for all files to finish uploading')
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000299 # Will raise if any exception occurred.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000300 remote_uploader.join()
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000301 logging.info('All files are uploaded')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000302
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000303 total = len(infiles)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000304 total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues())
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000305 logging.info(
306 'Total: %6d, %9.1fkb',
307 total,
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000308 sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.)
csharp@chromium.org20a888c2013-01-15 15:06:55 +0000309 cache_hit = set(infiles.iterkeys()) - set(x[0] for x in uploaded)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000310 cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000311 logging.info(
312 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
313 len(cache_hit),
314 cache_hit_size / 1024.,
315 len(cache_hit) * 100. / total,
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000316 cache_hit_size * 100. / total_size if total_size else 0)
csharp@chromium.org20a888c2013-01-15 15:06:55 +0000317 cache_miss = uploaded
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000318 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000319 logging.info(
320 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
321 len(cache_miss),
322 cache_miss_size / 1024.,
323 len(cache_miss) * 100. / total,
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000324 cache_miss_size * 100. / total_size if total_size else 0)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000325 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000326
327
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000328def main(args):
maruel@chromium.org46e61cc2013-03-25 19:55:34 +0000329 run_isolated.disable_buffering()
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000330 parser = optparse.OptionParser(
331 usage='%prog [options] <file1..fileN> or - to read from stdin',
332 description=sys.modules[__name__].__doc__)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000333 parser.add_option('-r', '--remote', help='Remote server to archive to')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000334 parser.add_option(
335 '-v', '--verbose',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000336 action='count', default=0,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000337 help='Use multiple times to increase verbosity')
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000338 parser.add_option('--namespace', default='default-gzip',
339 help='The namespace to use on the server.')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000340
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000341 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000342
343 levels = [logging.ERROR, logging.INFO, logging.DEBUG]
344 logging.basicConfig(
345 level=levels[min(len(levels)-1, options.verbose)],
346 format='%(levelname)5s %(module)15s(%(lineno)3d): %(message)s')
347 if files == ['-']:
348 files = sys.stdin.readlines()
349
350 if not files:
351 parser.error('Nothing to upload')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000352 if not options.remote:
353 parser.error('Nowhere to send. Please specify --remote')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000354
355 # Load the necessary metadata. This is going to be rewritten eventually to be
356 # more efficient.
357 infiles = dict(
358 (
359 f,
360 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000361 's': os.stat(f).st_size,
maruel@chromium.org037758d2012-12-10 17:59:46 +0000362 'h': sha1_file(f),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000363 }
364 )
365 for f in files)
366
367 with run_isolated.Profiler('Archive'):
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000368 return upload_sha1_tree(
369 base_url=options.remote,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000370 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000371 infiles=infiles,
372 namespace=options.namespace)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000373
374
375if __name__ == '__main__':
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000376 sys.exit(main(sys.argv[1:]))