blob: 07b22b79135504ba1f100e20d40e137170a842a7 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002# Copyright 2013 The Chromium Authors. All rights reserved.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00003# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Archives a set of files to a server."""
7
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00008__version__ = '0.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000010import binascii
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +000011import cStringIO
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import hashlib
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +000013import itertools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import random
17import re
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000021import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000022import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000024from third_party import colorama
25from third_party.depot_tools import fix_encoding
26from third_party.depot_tools import subcommand
27
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000028from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000029from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000030from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000031
32
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000033# Default server.
34# TODO(maruel): Chromium-specific.
35ISOLATE_SERVER = 'https://isolateserver-dev.appspot.com/'
36
37
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038# The minimum size of files to upload directly to the blobstore.
maruel@chromium.orgaef29f82012-12-12 15:00:42 +000039MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000040
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000041# The number of files to check the isolate server per /contains query.
42# All files are sorted by likelihood of a change in the file content
43# (currently file size is used to estimate this: larger the file -> larger the
44# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
45# are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1],
46# and so on. Numbers here is a trade-off; the more per request, the lower the
47# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
48# larger values cause longer lookups, increasing the initial latency to start
49# uploading, which is especially an issue for large files. This value is
50# optimized for the "few thousands files to look up with minimal number of large
51# files missing" case.
52ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000053
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000054
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000055# A list of already compressed extension types that should not receive any
56# compression before being uploaded.
57ALREADY_COMPRESSED_TYPES = [
58 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
59 'wav', 'zip'
60]
61
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000062
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000063# The file size to be used when we don't know the correct file size,
64# generally used for .isolated files.
65UNKNOWN_FILE_SIZE = None
66
67
68# The size of each chunk to read when downloading and unzipping files.
69ZIPPED_FILE_CHUNK = 16 * 1024
70
71
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000072# Chunk size to use when doing disk I/O.
73DISK_FILE_CHUNK = 1024 * 1024
74
75
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000076# Read timeout in seconds for downloads from isolate storage. If there's no
77# response from the server within this timeout whole download will be aborted.
78DOWNLOAD_READ_TIMEOUT = 60
79
80
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000081class ConfigError(ValueError):
82 """Generic failure to load a .isolated file."""
83 pass
84
85
86class MappingError(OSError):
87 """Failed to recreate the tree."""
88 pass
89
90
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +000091def randomness():
92 """Generates low-entropy randomness for MIME encoding.
93
94 Exists so it can be mocked out in unit tests.
95 """
96 return str(time.time())
97
98
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000099def encode_multipart_formdata(fields, files,
100 mime_mapper=lambda _: 'application/octet-stream'):
101 """Encodes a Multipart form data object.
102
103 Args:
104 fields: a sequence (name, value) elements for
105 regular form fields.
106 files: a sequence of (name, filename, value) elements for data to be
107 uploaded as files.
108 mime_mapper: function to return the mime type from the filename.
109 Returns:
110 content_type: for httplib.HTTP instance
111 body: for httplib.HTTP instance
112 """
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000113 boundary = hashlib.md5(randomness()).hexdigest()
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000114 body_list = []
115 for (key, value) in fields:
116 if isinstance(key, unicode):
117 value = key.encode('utf-8')
118 if isinstance(value, unicode):
119 value = value.encode('utf-8')
120 body_list.append('--' + boundary)
121 body_list.append('Content-Disposition: form-data; name="%s"' % key)
122 body_list.append('')
123 body_list.append(value)
124 body_list.append('--' + boundary)
125 body_list.append('')
126 for (key, filename, value) in files:
127 if isinstance(key, unicode):
128 value = key.encode('utf-8')
129 if isinstance(filename, unicode):
130 value = filename.encode('utf-8')
131 if isinstance(value, unicode):
132 value = value.encode('utf-8')
133 body_list.append('--' + boundary)
134 body_list.append('Content-Disposition: form-data; name="%s"; '
135 'filename="%s"' % (key, filename))
136 body_list.append('Content-Type: %s' % mime_mapper(filename))
137 body_list.append('')
138 body_list.append(value)
139 body_list.append('--' + boundary)
140 body_list.append('')
141 if body_list:
142 body_list[-2] += '--'
143 body = '\r\n'.join(body_list)
144 content_type = 'multipart/form-data; boundary=%s' % boundary
145 return content_type, body
146
147
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000148def is_valid_hash(value, algo):
149 """Returns if the value is a valid hash for the corresponding algorithm."""
150 size = 2 * algo().digest_size
151 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
152
153
154def hash_file(filepath, algo):
155 """Calculates the hash of a file without reading it all in memory at once.
156
157 |algo| should be one of hashlib hashing algorithm.
158 """
159 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000160 with open(filepath, 'rb') as f:
161 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000162 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000163 if not chunk:
164 break
165 digest.update(chunk)
166 return digest.hexdigest()
167
168
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000169def file_read(filepath):
170 """Yields file content."""
171 with open(filepath, 'rb') as f:
172 while True:
173 data = f.read(DISK_FILE_CHUNK)
174 if not data:
175 break
176 yield data
177
178
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000179def file_write(filepath, content_generator):
180 """Writes file content as generated by content_generator.
181
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000182 Creates the intermediary directory as needed.
183
184 Returns the number of bytes written.
185
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000186 Meant to be mocked out in unit tests.
187 """
188 filedir = os.path.dirname(filepath)
189 if not os.path.isdir(filedir):
190 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000191 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000192 with open(filepath, 'wb') as f:
193 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000194 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000195 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000196 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000197
198
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000199def create_directories(base_directory, files):
200 """Creates the directory structure needed by the given list of files."""
201 logging.debug('create_directories(%s, %d)', base_directory, len(files))
202 # Creates the tree of directories to create.
203 directories = set(os.path.dirname(f) for f in files)
204 for item in list(directories):
205 while item:
206 directories.add(item)
207 item = os.path.dirname(item)
208 for d in sorted(directories):
209 if d:
210 os.mkdir(os.path.join(base_directory, d))
211
212
213def create_links(base_directory, files):
214 """Creates any links needed by the given set of files."""
215 for filepath, properties in files:
216 if 'l' not in properties:
217 continue
218 if sys.platform == 'win32':
219 # TODO(maruel): Create junctions or empty text files similar to what
220 # cygwin do?
221 logging.warning('Ignoring symlink %s', filepath)
222 continue
223 outfile = os.path.join(base_directory, filepath)
224 # symlink doesn't exist on Windows. So the 'link' property should
225 # never be specified for windows .isolated file.
226 os.symlink(properties['l'], outfile) # pylint: disable=E1101
227 if 'm' in properties:
228 lchmod = getattr(os, 'lchmod', None)
229 if lchmod:
230 lchmod(outfile, properties['m'])
231
232
233def setup_commands(base_directory, cwd, cmd):
234 """Correctly adjusts and then returns the required working directory
235 and command needed to run the test.
236 """
237 assert not os.path.isabs(cwd), 'The cwd must be a relative path, got %s' % cwd
238 cwd = os.path.join(base_directory, cwd)
239 if not os.path.isdir(cwd):
240 os.makedirs(cwd)
241
242 # Ensure paths are correctly separated on windows.
243 cmd[0] = cmd[0].replace('/', os.path.sep)
244 cmd = tools.fix_python_path(cmd)
245
246 return cwd, cmd
247
248
249def generate_remaining_files(files):
250 """Generates a dictionary of all the remaining files to be downloaded."""
251 remaining = {}
252 for filepath, props in files:
253 if 'h' in props:
254 remaining.setdefault(props['h'], []).append((filepath, props))
255
256 return remaining
257
258
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000259def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000260 """Determines if the given files appears valid.
261
262 Currently it just checks the file's size.
263 """
264 if size == UNKNOWN_FILE_SIZE:
265 return True
266 actual_size = os.stat(filepath).st_size
267 if size != actual_size:
268 logging.warning(
269 'Found invalid item %s; %d != %d',
270 os.path.basename(filepath), actual_size, size)
271 return False
272 return True
273
274
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000275def try_remove(filepath):
276 """Removes a file without crashing even if it doesn't exist."""
277 try:
278 os.remove(filepath)
279 except OSError:
280 pass
281
282
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000283def url_read(url, **kwargs):
vadimsh@chromium.org6b706212013-08-28 15:03:46 +0000284 result = net.url_read(url, **kwargs)
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000285 if result is None:
maruel@chromium.orgef333122013-03-12 20:36:40 +0000286 # If we get no response from the server, assume it is down and raise an
287 # exception.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000288 raise MappingError('Unable to connect to server %s' % url)
maruel@chromium.orgef333122013-03-12 20:36:40 +0000289 return result
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000290
291
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000292class IsolateServer(object):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000293 """Client class to download or upload to Isolate Server."""
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000294 def __init__(self, base_url, namespace):
295 assert base_url.startswith('http'), base_url
296 self.content_url = base_url.rstrip('/') + '/content/'
297 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000298 self._token = None
299 self._lock = threading.Lock()
300
301 @property
302 def token(self):
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000303 # TODO(maruel): Make this request much earlier asynchronously while the
304 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000305 with self._lock:
306 if not self._token:
307 self._token = urllib.quote(url_read(self.content_url + 'get_token'))
308 return self._token
309
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000310 def fetch(self, item, expected_size):
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000311 """Fetches an object and yields its content."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000312 assert isinstance(item, basestring)
313 assert (
314 isinstance(expected_size, (int, long)) or
315 expected_size == UNKNOWN_FILE_SIZE)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000316 zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item)
317 logging.debug('download_file(%s)', zipped_url)
318
319 # Because the app engine DB is only eventually consistent, retry 404 errors
320 # because the file might just not be visible yet (even though it has been
321 # uploaded).
322 connection = net.url_open(
323 zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
324 if not connection:
325 raise IOError('Unable to open connection to %s' % zipped_url)
326
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000327 # TODO(maruel): Must only decompress when needed.
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000328 decompressor = zlib.decompressobj()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000329 try:
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000330 compressed_size = 0
331 decompressed_size = 0
332 while True:
333 chunk = connection.read(ZIPPED_FILE_CHUNK)
334 if not chunk:
335 break
336 compressed_size += len(chunk)
337 decompressed = decompressor.decompress(chunk)
338 decompressed_size += len(decompressed)
339 yield decompressed
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000340
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000341 # Ensure that all the data was properly decompressed.
342 uncompressed_data = decompressor.flush()
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000343 if uncompressed_data:
344 raise IOError('Decompression failed')
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000345 if (expected_size != UNKNOWN_FILE_SIZE and
346 decompressed_size != expected_size):
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000347 raise IOError('File incorrect size after download of %s. Got %s and '
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000348 'expected %s' % (item, decompressed_size, expected_size))
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000349 except zlib.error as e:
350 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000351 item, compressed_size, connection.content_length, e)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000352 logging.warning(msg)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000353
354 # Testing seems to show that if a few machines are trying to download
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000355 # the same blob, they can cause each other to fail. So if we hit a zip
356 # error, this is the most likely cause (it only downloads some of the
357 # data). Randomly sleep for between 5 and 25 seconds to try and spread
358 # out the downloads.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000359 sleep_duration = (random.random() * 20) + 5
360 time.sleep(sleep_duration)
361 raise IOError(msg)
maruel@chromium.orgc2bfef42013-08-30 21:46:26 +0000362
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000363 def push(self, item, expected_size, content_generator):
364 """Uploads content generated by |content_generator| as |item| to the remote
365 isolate server.
366 """
367 assert isinstance(item, basestring)
368 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
369 item = str(item)
370 # TODO(maruel): Support large files. This would require streaming support.
371 content = ''.join(content_generator)
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000372 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000373 return self._upload_hash_content_to_blobstore(item, content)
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000374
375 url = '%sstore/%s/%s?token=%s' % (
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000376 self.content_url, self.namespace, item, self.token)
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000377 return url_read(
378 url, data=content, content_type='application/octet-stream')
379
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000380 def _upload_hash_content_to_blobstore(self, item, content):
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000381 """Uploads the content directly to the blobstore via a generated url."""
382 # TODO(maruel): Support large files. This would require streaming support.
383 gen_url = '%sgenerate_blobstore_url/%s/%s' % (
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000384 self.content_url, self.namespace, item)
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000385 # Token is guaranteed to be already quoted but it is unnecessary here, and
386 # only here.
387 data = [('token', urllib.unquote(self.token))]
388 content_type, body = encode_multipart_formdata(
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000389 data, [('content', item, content)])
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000390 last_url = gen_url
391 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
392 # Retry HTTP 50x here but not 404.
393 upload_url = net.url_read(gen_url, data=data)
394 if not upload_url:
395 raise MappingError('Unable to connect to server %s' % gen_url)
396 last_url = upload_url
397
398 # Do not retry this request on HTTP 50x. Regenerate an upload url each
399 # time since uploading "consumes" the upload url.
400 result = net.url_read(
401 upload_url, data=body, content_type=content_type, retry_50x=False)
402 if result is not None:
403 return result
404 raise MappingError('Unable to connect to server %s' % last_url)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000405
406
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000407def check_files_exist_on_server(query_url, queries):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000408 """Queries the server to see which files from this batch already exist there.
409
410 Arguments:
411 queries: The hash files to potential upload to the server.
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000412 Returns:
413 missing_files: list of files that are missing on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000414 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000415 # TODO(maruel): Move inside IsolateServer.
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000416 logging.info('Checking existence of %d files...', len(queries))
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000417 body = ''.join(
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000418 (binascii.unhexlify(meta_data['h']) for (_, meta_data) in queries))
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000419 assert (len(body) % 20) == 0, repr(body)
420
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000421 response = url_read(
422 query_url, data=body, content_type='application/octet-stream')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000423 if len(queries) != len(response):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000424 raise MappingError(
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000425 'Got an incorrect number of responses from the server. Expected %d, '
426 'but got %d' % (len(queries), len(response)))
427
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000428 missing_files = [
429 queries[i] for i, flag in enumerate(response) if flag == chr(0)
430 ]
431 logging.info('Queried %d files, %d cache hit',
432 len(queries), len(queries) - len(missing_files))
433 return missing_files
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000434
435
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000436class FileSystem(object):
437 """Fetches data from the file system.
438
439 The common use case is a NFS/CIFS file server that is mounted locally that is
440 used to fetch the file on a local partition.
441 """
442 def __init__(self, base_path):
443 self.base_path = base_path
444
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000445 def fetch(self, item, expected_size):
446 assert isinstance(item, basestring)
447 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000448 source = os.path.join(self.base_path, item)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000449 if (expected_size != UNKNOWN_FILE_SIZE and
450 not is_valid_file(source, expected_size)):
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000451 raise IOError('Invalid file %s' % item)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000452 return file_read(source)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000453
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000454 def push(self, item, expected_size, content_generator):
455 assert isinstance(item, basestring)
456 assert isinstance(expected_size, int) or expected_size == UNKNOWN_FILE_SIZE
457 dest = os.path.join(self.base_path, item)
458 total = file_write(dest, content_generator)
459 if expected_size != UNKNOWN_FILE_SIZE and total != expected_size:
460 os.remove(dest)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000461 raise IOError(
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000462 'Invalid file %s, %d != %d' % (item, total, expected_size))
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000463
464
465def get_storage_api(file_or_url, namespace):
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000466 """Returns an object that implements .fetch() and .push()."""
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000467 if re.match(r'^https?://.+$', file_or_url):
468 return IsolateServer(file_or_url, namespace)
469 else:
470 return FileSystem(file_or_url)
471
472
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000473class WorkerPool(threading_utils.AutoRetryThreadPool):
474 """Thread pool that automatically retries on IOError and runs a preconfigured
475 function.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000476 """
477 # Initial and maximum number of worker threads.
478 INITIAL_WORKERS = 2
479 MAX_WORKERS = 16
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000480 RETRIES = 5
481
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000482 def __init__(self):
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000483 super(WorkerPool, self).__init__(
484 [IOError],
485 self.RETRIES,
486 self.INITIAL_WORKERS,
487 self.MAX_WORKERS,
488 0,
489 'remote')
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000490
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000491
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000492def compression_level(filename):
493 """Given a filename calculates the ideal compression level to use."""
494 file_ext = os.path.splitext(filename)[1].lower()
495 # TODO(csharp): Profile to find what compression level works best.
496 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
497
498
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000499def read_and_compress(filepath, level):
500 """Reads a file and returns its content gzip compressed."""
501 compressor = zlib.compressobj(level)
502 compressed_data = cStringIO.StringIO()
503 with open(filepath, 'rb') as f:
504 while True:
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000505 chunk = f.read(ZIPPED_FILE_CHUNK)
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000506 if not chunk:
507 break
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000508 compressed_data.write(compressor.compress(chunk))
509 compressed_data.write(compressor.flush(zlib.Z_FINISH))
510 value = compressed_data.getvalue()
511 compressed_data.close()
512 return value
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000513
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000514
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000515def zip_and_trigger_upload(infile, metadata, add_item):
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000516 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
517 # if not metadata['T']:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000518 # TODO(maruel): Use a generator?
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000519 compressed_data = read_and_compress(infile, compression_level(infile))
520 priority = (
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000521 WorkerPool.HIGH if metadata.get('priority', '1') == '0'
522 else WorkerPool.MED)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000523 return add_item(priority, metadata['h'], [compressed_data])
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000524
525
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000526def batch_files_for_check(infiles):
527 """Splits list of files to check for existence on the server into batches.
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000528
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000529 Each batch corresponds to a single 'exists?' query to the server.
530
531 Yields:
532 batches: list of batches, each batch is a list of files.
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000533 """
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +0000534 batch_count = 0
535 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000536 next_queries = []
csharp@chromium.org90c45812013-01-23 14:27:21 +0000537 items = ((k, v) for k, v in infiles.iteritems() if 's' in v)
538 for relfile, metadata in sorted(items, key=lambda x: -x[1]['s']):
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000539 next_queries.append((relfile, metadata))
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +0000540 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000541 yield next_queries
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000542 next_queries = []
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +0000543 batch_count += 1
544 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
545 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000546 if next_queries:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000547 yield next_queries
548
549
550def get_files_to_upload(contains_hash_url, infiles):
551 """Yields files that are missing on the server."""
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +0000552 with threading_utils.ThreadPool(1, 16, 0, prefix='get_files_to_upload') as tp:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000553 for files in batch_files_for_check(infiles):
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +0000554 tp.add_task(0, check_files_exist_on_server, contains_hash_url, files)
555 for missing_file in itertools.chain.from_iterable(tp.iter_results()):
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000556 yield missing_file
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000557
558
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000559def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000560 """Uploads the given tree to the given url.
561
562 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000563 base_url: The base url, it is assume that |base_url|/has/ can be used to
564 query if an element was already uploaded, and |base_url|/store/
565 can be used to upload a new element.
566 indir: Root directory the infiles are based in.
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000567 infiles: dict of files to upload files from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000568 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000569 """
570 logging.info('upload tree(base_url=%s, indir=%s, files=%d)' %
571 (base_url, indir, len(infiles)))
maruel@chromium.org034e3962013-03-13 13:34:25 +0000572
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000573 # Create a pool of workers to zip and upload any files missing from
574 # the server.
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +0000575 num_threads = threading_utils.num_processors()
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000576 remote = get_storage_api(base_url, namespace)
577 # TODO(maruel): There's three separate thread pools here, it is not very
578 # efficient. remote_uploader and get_files_to_upload() should share the same
579 # pool and control priorities accordingly.
580 uploaded = []
581 with WorkerPool() as remote_uploader:
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000582 # Starts the zip and upload process for files that are missing
583 # from the server.
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000584 # TODO(maruel): Move .contains() to the API.
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000585 contains_hash_url = '%scontains/%s?token=%s' % (
586 remote.content_url, namespace, remote.token)
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000587
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000588 def add_item(priority, item, content_generator):
589 remote_uploader.add_task(
590 priority, remote.push, item, UNKNOWN_FILE_SIZE, content_generator)
591
592 with threading_utils.ThreadPool(
593 min(2, num_threads), num_threads, 0, 'zip') as zipping_pool:
594 for relfile, metadata in get_files_to_upload(contains_hash_url, infiles):
595 infile = os.path.join(indir, relfile)
596 zipping_pool.add_task(
597 0, zip_and_trigger_upload, infile, metadata, add_item)
598 uploaded.append((relfile, metadata))
599
600 logging.info('Waiting for all files to finish zipping')
601 zipping_pool.join()
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000602 logging.info('All files zipped.')
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000603 remote_uploader.join()
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000604 logging.info('All files are uploaded')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000605
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000606 total = len(infiles)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000607 total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues())
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000608 logging.info(
609 'Total: %6d, %9.1fkb',
610 total,
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000611 sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.)
csharp@chromium.org20a888c2013-01-15 15:06:55 +0000612 cache_hit = set(infiles.iterkeys()) - set(x[0] for x in uploaded)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000613 cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000614 logging.info(
615 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
616 len(cache_hit),
617 cache_hit_size / 1024.,
618 len(cache_hit) * 100. / total,
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000619 cache_hit_size * 100. / total_size if total_size else 0)
csharp@chromium.org20a888c2013-01-15 15:06:55 +0000620 cache_miss = uploaded
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000621 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000622 logging.info(
623 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
624 len(cache_miss),
625 cache_miss_size / 1024.,
626 len(cache_miss) * 100. / total,
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000627 cache_miss_size * 100. / total_size if total_size else 0)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000628 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000629
630
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000631@subcommand.usage('<file1..fileN> or - to read from stdin')
632def CMDarchive(parser, args):
633 """Archives data to the server."""
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000634 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000635
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000636 if files == ['-']:
637 files = sys.stdin.readlines()
638
639 if not files:
640 parser.error('Nothing to upload')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000641 if not options.isolate_server:
642 parser.error('Nowhere to send. Please specify --isolate-server')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000643
644 # Load the necessary metadata. This is going to be rewritten eventually to be
645 # more efficient.
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000646 algo = hashlib.sha1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000647 infiles = dict(
648 (
649 f,
650 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000651 's': os.stat(f).st_size,
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000652 'h': hash_file(f, algo),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000653 }
654 )
655 for f in files)
656
vadimsh@chromium.orga4326472013-08-24 02:05:41 +0000657 with tools.Profiler('Archive'):
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000658 ret = upload_tree(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000659 base_url=options.isolate_server,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000660 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000661 infiles=infiles,
662 namespace=options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000663 if not ret:
664 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
665 return ret
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000666
667
668def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000669 """Download data from the server.
670
671 It can download individual files.
672 """
673 parser.add_option(
674 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
675 help='hash and destination of a file, can be used multiple times')
676 parser.add_option(
677 '-t', '--target', metavar='DIR', default=os.getcwd(),
678 help='destination directory')
679 options, args = parser.parse_args(args)
680 if args:
681 parser.error('Unsupported arguments: %s' % args)
682 if not options.file:
683 parser.error('Use one of --file is required.')
684
685 options.target = os.path.abspath(options.target)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000686 remote = get_storage_api(options.isolate_server, options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000687 for h, dest in options.file:
688 logging.info('%s: %s', h, dest)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000689 file_write(
690 os.path.join(options.target, dest),
691 remote.fetch(h, UNKNOWN_FILE_SIZE))
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000692 return 0
693
694
695class OptionParserIsolateServer(tools.OptionParserWithLogging):
696 def __init__(self, **kwargs):
697 tools.OptionParserWithLogging.__init__(self, **kwargs)
698 self.add_option(
699 '-I', '--isolate-server',
700 default=ISOLATE_SERVER,
701 metavar='URL',
702 help='Isolate server where data is stored. default: %default')
703 self.add_option(
704 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000705 help='The namespace to use on the server, default: %default')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000706
707 def parse_args(self, *args, **kwargs):
708 options, args = tools.OptionParserWithLogging.parse_args(
709 self, *args, **kwargs)
710 options.isolate_server = options.isolate_server.rstrip('/')
711 if not options.isolate_server:
712 self.error('--isolate-server is required.')
713 return options, args
714
715
716def main(args):
717 dispatcher = subcommand.CommandDispatcher(__name__)
718 try:
719 return dispatcher.execute(
720 OptionParserIsolateServer(version=__version__), args)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000721 except (ConfigError, MappingError) as e:
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000722 sys.stderr.write('\nError: ')
723 sys.stderr.write(str(e))
724 sys.stderr.write('\n')
725 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000726
727
728if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000729 fix_encoding.fix_encoding()
730 tools.disable_buffering()
731 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000732 sys.exit(main(sys.argv[1:]))