blob: 3b489ffacef59c2b476a3a418b147d9a98057171 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
2# Copyright (c) 2012 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Archives a set of files to a server."""
7
8import binascii
9import hashlib
10import logging
11import optparse
12import os
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000013import cStringIO
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import sys
15import time
16import urllib2
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000017import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018
19import run_isolated
20
21
22# The maximum number of upload attempts to try when uploading a single file.
23MAX_UPLOAD_ATTEMPTS = 5
24
25# The minimum size of files to upload directly to the blobstore.
26MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 8
27
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000028# A list of already compressed extension types that should not receive any
29# compression before being uploaded.
30ALREADY_COMPRESSED_TYPES = [
31 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
32 'wav', 'zip'
33]
34
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000035
36def encode_multipart_formdata(fields, files,
37 mime_mapper=lambda _: 'application/octet-stream'):
38 """Encodes a Multipart form data object.
39
40 Args:
41 fields: a sequence (name, value) elements for
42 regular form fields.
43 files: a sequence of (name, filename, value) elements for data to be
44 uploaded as files.
45 mime_mapper: function to return the mime type from the filename.
46 Returns:
47 content_type: for httplib.HTTP instance
48 body: for httplib.HTTP instance
49 """
50 boundary = hashlib.md5(str(time.time())).hexdigest()
51 body_list = []
52 for (key, value) in fields:
53 if isinstance(key, unicode):
54 value = key.encode('utf-8')
55 if isinstance(value, unicode):
56 value = value.encode('utf-8')
57 body_list.append('--' + boundary)
58 body_list.append('Content-Disposition: form-data; name="%s"' % key)
59 body_list.append('')
60 body_list.append(value)
61 body_list.append('--' + boundary)
62 body_list.append('')
63 for (key, filename, value) in files:
64 if isinstance(key, unicode):
65 value = key.encode('utf-8')
66 if isinstance(filename, unicode):
67 value = filename.encode('utf-8')
68 if isinstance(value, unicode):
69 value = value.encode('utf-8')
70 body_list.append('--' + boundary)
71 body_list.append('Content-Disposition: form-data; name="%s"; '
72 'filename="%s"' % (key, filename))
73 body_list.append('Content-Type: %s' % mime_mapper(filename))
74 body_list.append('')
75 body_list.append(value)
76 body_list.append('--' + boundary)
77 body_list.append('')
78 if body_list:
79 body_list[-2] += '--'
80 body = '\r\n'.join(body_list)
81 content_type = 'multipart/form-data; boundary=%s' % boundary
82 return content_type, body
83
84
85def gen_url_request(url, payload, content_type='application/octet-stream'):
86 """Returns a POST request."""
87 request = urllib2.Request(url, data=payload)
88 if payload is not None:
89 request.add_header('Content-Type', content_type)
90 request.add_header('Content-Length', len(payload))
91 return request
92
93
94def url_open(url, data, content_type='application/octet-stream'):
95 """Opens the given url with the given data, repeating up to
96 MAX_UPLOAD_ATTEMPTS times if it encounters an error.
97
98 Arguments:
99 url: The url to open.
100 data: The data to send to the url.
101
102 Returns:
103 The response from the url, or it raises an exception it it failed to get
104 a response.
105 """
106 request = gen_url_request(url, data, content_type)
maruel@chromium.org3dc6abd2012-11-15 17:01:53 +0000107 last_error = None
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000108 for i in range(MAX_UPLOAD_ATTEMPTS):
109 try:
110 return urllib2.urlopen(request)
111 except urllib2.URLError as e:
maruel@chromium.org3dc6abd2012-11-15 17:01:53 +0000112 last_error = e
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000113 logging.warning('Unable to connect to %s, error msg: %s', url, e)
114 time.sleep(0.5 + i)
115
116 # If we get no response from the server after max_retries, assume it
117 # is down and raise an exception
118 raise run_isolated.MappingError(
maruel@chromium.org3dc6abd2012-11-15 17:01:53 +0000119 'Unable to connect to server, %s, to see which files are presents: %s' %
120 (url, last_error))
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000121
122
maruel@chromium.org00a7d6c2012-11-22 14:11:01 +0000123def upload_hash_content_to_blobstore(generate_upload_url, hash_key, content):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000124 """Uploads the given hash contents directly to the blobsotre via a generated
125 url.
126
127 Arguments:
128 generate_upload_url: The url to get the new upload url from.
129 hash_contents: The contents to upload.
130 """
131 logging.debug('Generating url to directly upload file to blobstore')
132 upload_url = url_open(generate_upload_url, None).read()
133
134 if not upload_url:
135 logging.error('Unable to generate upload url')
136 return
137
138 content_type, body = encode_multipart_formdata(
maruel@chromium.org00a7d6c2012-11-22 14:11:01 +0000139 [], [('hash_contents', hash_key, content)])
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000140 url_open(upload_url, body, content_type)
141
142
143class UploadRemote(run_isolated.Remote):
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000144 def __init__(self, namespace, *args, **kwargs):
145 super(UploadRemote, self).__init__(*args, **kwargs)
146 self.namespace = namespace
147
148 def get_file_handler(self, base_url):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000149 def upload_file(content, hash_key):
150 content_url = base_url.rstrip('/') + '/content/'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000151 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE:
152 upload_hash_content_to_blobstore(
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000153 content_url + 'generate_blobstore_url/' + self.namespace + '/' +
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000154 hash_key,
maruel@chromium.org00a7d6c2012-11-22 14:11:01 +0000155 hash_key,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000156 content)
157 else:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000158 url_open(content_url + 'store/' + self.namespace + '/' + hash_key,
159 content)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000160 return upload_file
161
162
163def update_files_to_upload(query_url, queries, files_to_upload):
164 """Queries the server to see which files from this batch already exist there.
165
166 Arguments:
167 queries: The hash files to potential upload to the server.
168 files_to_upload: Any new files that need to be upload are added to
169 this list.
170 """
171 body = ''.join(
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000172 (binascii.unhexlify(meta_data['h']) for (_, meta_data) in queries))
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000173 assert (len(body) % 20) == 0, repr(body)
174
175 response = url_open(query_url, body).read()
176 if len(queries) != len(response):
177 raise run_isolated.MappingError(
178 'Got an incorrect number of responses from the server. Expected %d, '
179 'but got %d' % (len(queries), len(response)))
180
181 hit = 0
182 for i in range(len(response)):
183 if response[i] == chr(0):
184 files_to_upload.append(queries[i])
185 else:
186 hit += 1
187 logging.info('Queried %d files, %d cache hit', len(queries), hit)
188
189
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000190def compression_level(filename):
191 """Given a filename calculates the ideal compression level to use."""
192 file_ext = os.path.splitext(filename)[1].lower()
193 # TODO(csharp): Profile to find what compression level works best.
194 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
195
196
197def zip_and_trigger_upload(infile, metadata, upload_function):
198 compressor = zlib.compressobj(compression_level(infile))
199 hash_data = cStringIO.StringIO()
200 with open(infile, 'rb') as f:
201 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
202 while True: # and not metadata['T']:
203 chunk = f.read(run_isolated.ZIPPED_FILE_CHUNK)
204 if not chunk:
205 break
206 hash_data.write(compressor.compress(chunk))
207
208 hash_data.write(compressor.flush(zlib.Z_FINISH))
209 priority = (
210 run_isolated.Remote.HIGH if metadata.get('priority', '1') == '0'
211 else run_isolated.Remote.MED)
212 upload_function(priority, hash_data.getvalue(), metadata['h'],
213 None)
214 hash_data.close()
215
216
217def upload_sha1_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000218 """Uploads the given tree to the given url.
219
220 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000221 base_url: The base url, it is assume that |base_url|/has/ can be used to
222 query if an element was already uploaded, and |base_url|/store/
223 can be used to upload a new element.
224 indir: Root directory the infiles are based in.
225 infiles: dict of files to map from |indir| to |outdir|.
226 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000227 """
228 logging.info('upload tree(base_url=%s, indir=%s, files=%d)' %
229 (base_url, indir, len(infiles)))
230
231 # Generate the list of files that need to be uploaded (since some may already
232 # be on the server.
233 base_url = base_url.rstrip('/')
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000234 contains_hash_url = base_url + '/content/contains/' + namespace
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000235 to_upload = []
236 next_queries = []
237 for relfile, metadata in infiles.iteritems():
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000238 if 'l' in metadata:
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000239 # Skip links when uploading.
240 continue
241
242 next_queries.append((relfile, metadata))
243 if len(next_queries) == 1000:
244 update_files_to_upload(contains_hash_url, next_queries, to_upload)
245 next_queries = []
246
247 if next_queries:
248 update_files_to_upload(contains_hash_url, next_queries, to_upload)
249
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000250 # Zip the required files and then upload them.
251 # TODO(csharp): use num_processors().
252 zipping_pool = run_isolated.ThreadPool(num_threads=4)
253 remote_uploader = UploadRemote(namespace, base_url)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000254 for relfile, metadata in to_upload:
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000255 infile = os.path.join(indir, relfile)
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000256 zipping_pool.add_task(zip_and_trigger_upload, infile, metadata,
257 remote_uploader.add_item)
258 logging.info('Waiting for all files to finish zipping')
259 zipping_pool.join()
260 logging.info('All files zipped.')
261
262 logging.info('Waiting for all files to finish uploading')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000263 remote_uploader.join()
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000264 logging.info('All files are uploaded')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000265
266 exception = remote_uploader.next_exception()
267 if exception:
268 raise exception[0], exception[1], exception[2]
269 total = len(infiles)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000270 total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues())
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000271 logging.info(
272 'Total: %6d, %9.1fkb',
273 total,
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000274 sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000275 cache_hit = set(infiles.iterkeys()) - set(x[0] for x in to_upload)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000276 cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000277 logging.info(
278 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
279 len(cache_hit),
280 cache_hit_size / 1024.,
281 len(cache_hit) * 100. / total,
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000282 cache_hit_size * 100. / total_size if total_size else 0)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000283 cache_miss = to_upload
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000284 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000285 logging.info(
286 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
287 len(cache_miss),
288 cache_miss_size / 1024.,
289 len(cache_miss) * 100. / total,
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000290 cache_miss_size * 100. / total_size if total_size else 0)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000291
292
293def main():
294 parser = optparse.OptionParser(
295 usage='%prog [options] <file1..fileN> or - to read from stdin',
296 description=sys.modules[__name__].__doc__)
297 # TODO(maruel): Support both NFS and isolateserver.
298 parser.add_option('-o', '--outdir', help='Remote server to archive to')
299 parser.add_option(
300 '-v', '--verbose',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000301 action='count', default=0,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000302 help='Use multiple times to increase verbosity')
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000303 parser.add_option('--namespace', default='default-gzip',
304 help='The namespace to use on the server.')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000305
306 options, files = parser.parse_args()
307
308 levels = [logging.ERROR, logging.INFO, logging.DEBUG]
309 logging.basicConfig(
310 level=levels[min(len(levels)-1, options.verbose)],
311 format='%(levelname)5s %(module)15s(%(lineno)3d): %(message)s')
312 if files == ['-']:
313 files = sys.stdin.readlines()
314
315 if not files:
316 parser.error('Nothing to upload')
317 if not options.outdir:
318 parser.error('Nowhere to send. Please specify --outdir')
319
320 # Load the necessary metadata. This is going to be rewritten eventually to be
321 # more efficient.
322 infiles = dict(
323 (
324 f,
325 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000326 's': os.stat(f).st_size,
327 'h': hashlib.sha1(open(f, 'r').read()).hexdigest(),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000328 }
329 )
330 for f in files)
331
332 with run_isolated.Profiler('Archive'):
333 upload_sha1_tree(
334 base_url=options.outdir,
335 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000336 infiles=infiles,
337 namespace=options.namespace)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000338 return 0
339
340
341if __name__ == '__main__':
342 sys.exit(main())