blob: acb7b9dfe75837efd0a31187aced5787cb6c62de [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002# Copyright 2013 The Chromium Authors. All rights reserved.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00003# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Archives a set of files to a server."""
7
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00008__version__ = '0.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000010import binascii
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +000011import cStringIO
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import hashlib
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +000013import itertools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import random
17import re
18import shutil
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000020import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000029from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000030from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000031from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000032
33
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000034# Default server.
35# TODO(maruel): Chromium-specific.
36ISOLATE_SERVER = 'https://isolateserver-dev.appspot.com/'
37
38
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039# The minimum size of files to upload directly to the blobstore.
maruel@chromium.orgaef29f82012-12-12 15:00:42 +000040MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 1024
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000042# The number of files to check the isolate server per /contains query.
43# All files are sorted by likelihood of a change in the file content
44# (currently file size is used to estimate this: larger the file -> larger the
45# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
46# are taken and send to '/contains', then next ITEMS_PER_CONTAINS_QUERIES[1],
47# and so on. Numbers here is a trade-off; the more per request, the lower the
48# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
49# larger values cause longer lookups, increasing the initial latency to start
50# uploading, which is especially an issue for large files. This value is
51# optimized for the "few thousands files to look up with minimal number of large
52# files missing" case.
53ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000054
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000055
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000056# A list of already compressed extension types that should not receive any
57# compression before being uploaded.
58ALREADY_COMPRESSED_TYPES = [
59 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
60 'wav', 'zip'
61]
62
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000063
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000064# The file size to be used when we don't know the correct file size,
65# generally used for .isolated files.
66UNKNOWN_FILE_SIZE = None
67
68
69# The size of each chunk to read when downloading and unzipping files.
70ZIPPED_FILE_CHUNK = 16 * 1024
71
72
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000073# Read timeout in seconds for downloads from isolate storage. If there's no
74# response from the server within this timeout whole download will be aborted.
75DOWNLOAD_READ_TIMEOUT = 60
76
77
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000078class ConfigError(ValueError):
79 """Generic failure to load a .isolated file."""
80 pass
81
82
83class MappingError(OSError):
84 """Failed to recreate the tree."""
85 pass
86
87
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +000088def randomness():
89 """Generates low-entropy randomness for MIME encoding.
90
91 Exists so it can be mocked out in unit tests.
92 """
93 return str(time.time())
94
95
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000096def encode_multipart_formdata(fields, files,
97 mime_mapper=lambda _: 'application/octet-stream'):
98 """Encodes a Multipart form data object.
99
100 Args:
101 fields: a sequence (name, value) elements for
102 regular form fields.
103 files: a sequence of (name, filename, value) elements for data to be
104 uploaded as files.
105 mime_mapper: function to return the mime type from the filename.
106 Returns:
107 content_type: for httplib.HTTP instance
108 body: for httplib.HTTP instance
109 """
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000110 boundary = hashlib.md5(randomness()).hexdigest()
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000111 body_list = []
112 for (key, value) in fields:
113 if isinstance(key, unicode):
114 value = key.encode('utf-8')
115 if isinstance(value, unicode):
116 value = value.encode('utf-8')
117 body_list.append('--' + boundary)
118 body_list.append('Content-Disposition: form-data; name="%s"' % key)
119 body_list.append('')
120 body_list.append(value)
121 body_list.append('--' + boundary)
122 body_list.append('')
123 for (key, filename, value) in files:
124 if isinstance(key, unicode):
125 value = key.encode('utf-8')
126 if isinstance(filename, unicode):
127 value = filename.encode('utf-8')
128 if isinstance(value, unicode):
129 value = value.encode('utf-8')
130 body_list.append('--' + boundary)
131 body_list.append('Content-Disposition: form-data; name="%s"; '
132 'filename="%s"' % (key, filename))
133 body_list.append('Content-Type: %s' % mime_mapper(filename))
134 body_list.append('')
135 body_list.append(value)
136 body_list.append('--' + boundary)
137 body_list.append('')
138 if body_list:
139 body_list[-2] += '--'
140 body = '\r\n'.join(body_list)
141 content_type = 'multipart/form-data; boundary=%s' % boundary
142 return content_type, body
143
144
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000145def is_valid_hash(value, algo):
146 """Returns if the value is a valid hash for the corresponding algorithm."""
147 size = 2 * algo().digest_size
148 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
149
150
151def hash_file(filepath, algo):
152 """Calculates the hash of a file without reading it all in memory at once.
153
154 |algo| should be one of hashlib hashing algorithm.
155 """
156 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000157 with open(filepath, 'rb') as f:
158 while True:
159 # Read in 1mb chunks.
160 chunk = f.read(1024*1024)
161 if not chunk:
162 break
163 digest.update(chunk)
164 return digest.hexdigest()
165
166
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000167def file_write(filepath, content_generator):
168 """Writes file content as generated by content_generator.
169
170 Meant to be mocked out in unit tests.
171 """
172 filedir = os.path.dirname(filepath)
173 if not os.path.isdir(filedir):
174 os.makedirs(filedir)
175 with open(filepath, 'wb') as f:
176 for d in content_generator:
177 f.write(d)
178
179
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000180def create_directories(base_directory, files):
181 """Creates the directory structure needed by the given list of files."""
182 logging.debug('create_directories(%s, %d)', base_directory, len(files))
183 # Creates the tree of directories to create.
184 directories = set(os.path.dirname(f) for f in files)
185 for item in list(directories):
186 while item:
187 directories.add(item)
188 item = os.path.dirname(item)
189 for d in sorted(directories):
190 if d:
191 os.mkdir(os.path.join(base_directory, d))
192
193
194def create_links(base_directory, files):
195 """Creates any links needed by the given set of files."""
196 for filepath, properties in files:
197 if 'l' not in properties:
198 continue
199 if sys.platform == 'win32':
200 # TODO(maruel): Create junctions or empty text files similar to what
201 # cygwin do?
202 logging.warning('Ignoring symlink %s', filepath)
203 continue
204 outfile = os.path.join(base_directory, filepath)
205 # symlink doesn't exist on Windows. So the 'link' property should
206 # never be specified for windows .isolated file.
207 os.symlink(properties['l'], outfile) # pylint: disable=E1101
208 if 'm' in properties:
209 lchmod = getattr(os, 'lchmod', None)
210 if lchmod:
211 lchmod(outfile, properties['m'])
212
213
214def setup_commands(base_directory, cwd, cmd):
215 """Correctly adjusts and then returns the required working directory
216 and command needed to run the test.
217 """
218 assert not os.path.isabs(cwd), 'The cwd must be a relative path, got %s' % cwd
219 cwd = os.path.join(base_directory, cwd)
220 if not os.path.isdir(cwd):
221 os.makedirs(cwd)
222
223 # Ensure paths are correctly separated on windows.
224 cmd[0] = cmd[0].replace('/', os.path.sep)
225 cmd = tools.fix_python_path(cmd)
226
227 return cwd, cmd
228
229
230def generate_remaining_files(files):
231 """Generates a dictionary of all the remaining files to be downloaded."""
232 remaining = {}
233 for filepath, props in files:
234 if 'h' in props:
235 remaining.setdefault(props['h'], []).append((filepath, props))
236
237 return remaining
238
239
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000240def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000241 """Determines if the given files appears valid.
242
243 Currently it just checks the file's size.
244 """
245 if size == UNKNOWN_FILE_SIZE:
246 return True
247 actual_size = os.stat(filepath).st_size
248 if size != actual_size:
249 logging.warning(
250 'Found invalid item %s; %d != %d',
251 os.path.basename(filepath), actual_size, size)
252 return False
253 return True
254
255
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000256def try_remove(filepath):
257 """Removes a file without crashing even if it doesn't exist."""
258 try:
259 os.remove(filepath)
260 except OSError:
261 pass
262
263
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000264def url_read(url, **kwargs):
vadimsh@chromium.org6b706212013-08-28 15:03:46 +0000265 result = net.url_read(url, **kwargs)
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000266 if result is None:
maruel@chromium.orgef333122013-03-12 20:36:40 +0000267 # If we get no response from the server, assume it is down and raise an
268 # exception.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000269 raise MappingError('Unable to connect to server %s' % url)
maruel@chromium.orgef333122013-03-12 20:36:40 +0000270 return result
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000271
272
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000273class IsolateServer(object):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000274 """Client class to download or upload to Isolate Server."""
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000275 def __init__(self, base_url, namespace):
276 assert base_url.startswith('http'), base_url
277 self.content_url = base_url.rstrip('/') + '/content/'
278 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000279 self._token = None
280 self._lock = threading.Lock()
281
282 @property
283 def token(self):
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000284 # TODO(maruel): Make this request much earlier asynchronously while the
285 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000286 with self._lock:
287 if not self._token:
288 self._token = urllib.quote(url_read(self.content_url + 'get_token'))
289 return self._token
290
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000291 def fetch(self, item, size):
292 """Fetches an object and yields its content."""
293 zipped_url = '%sretrieve/%s/%s' % (self.content_url, self.namespace, item)
294 logging.debug('download_file(%s)', zipped_url)
295
296 # Because the app engine DB is only eventually consistent, retry 404 errors
297 # because the file might just not be visible yet (even though it has been
298 # uploaded).
299 connection = net.url_open(
300 zipped_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
301 if not connection:
302 raise IOError('Unable to open connection to %s' % zipped_url)
303
304 decompressor = zlib.decompressobj()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000305 try:
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000306 compressed_size = 0
307 decompressed_size = 0
308 while True:
309 chunk = connection.read(ZIPPED_FILE_CHUNK)
310 if not chunk:
311 break
312 compressed_size += len(chunk)
313 decompressed = decompressor.decompress(chunk)
314 decompressed_size += len(decompressed)
315 yield decompressed
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000316
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000317 # Ensure that all the data was properly decompressed.
318 uncompressed_data = decompressor.flush()
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000319 if uncompressed_data:
320 raise IOError('Decompression failed')
321 if size != UNKNOWN_FILE_SIZE and decompressed_size != size:
322 raise IOError('File incorrect size after download of %s. Got %s and '
323 'expected %s' % (item, decompressed_size, size))
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000324 except zlib.error as e:
325 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000326 item, compressed_size, connection.content_length, e)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000327 logging.error(msg)
328
329 # Testing seems to show that if a few machines are trying to download
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000330 # the same blob, they can cause each other to fail. So if we hit a zip
331 # error, this is the most likely cause (it only downloads some of the
332 # data). Randomly sleep for between 5 and 25 seconds to try and spread
333 # out the downloads.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000334 sleep_duration = (random.random() * 20) + 5
335 time.sleep(sleep_duration)
336 raise IOError(msg)
maruel@chromium.orgc2bfef42013-08-30 21:46:26 +0000337
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000338 def retrieve(self, item, dest, size):
339 """Fetches an object and save its content to |dest|."""
340 try:
341 file_write(dest, self.fetch(item, size))
342 except IOError as e:
343 # Remove unfinished download.
344 try_remove(dest)
345 logging.error('Failed to download %s at %s.\n%s', item, dest, e)
346 raise
347
348 def store(self, content, hash_key, _size):
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000349 # TODO(maruel): Detect failures.
350 hash_key = str(hash_key)
351 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE:
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000352 return self._upload_hash_content_to_blobstore(hash_key, content)
353
354 url = '%sstore/%s/%s?token=%s' % (
355 self.content_url, self.namespace, hash_key, self.token)
356 return url_read(
357 url, data=content, content_type='application/octet-stream')
358
359 def _upload_hash_content_to_blobstore(self, hash_key, content):
360 """Uploads the content directly to the blobstore via a generated url."""
361 # TODO(maruel): Support large files. This would require streaming support.
362 gen_url = '%sgenerate_blobstore_url/%s/%s' % (
363 self.content_url, self.namespace, hash_key)
364 # Token is guaranteed to be already quoted but it is unnecessary here, and
365 # only here.
366 data = [('token', urllib.unquote(self.token))]
367 content_type, body = encode_multipart_formdata(
368 data, [('content', hash_key, content)])
369 last_url = gen_url
370 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
371 # Retry HTTP 50x here but not 404.
372 upload_url = net.url_read(gen_url, data=data)
373 if not upload_url:
374 raise MappingError('Unable to connect to server %s' % gen_url)
375 last_url = upload_url
376
377 # Do not retry this request on HTTP 50x. Regenerate an upload url each
378 # time since uploading "consumes" the upload url.
379 result = net.url_read(
380 upload_url, data=body, content_type=content_type, retry_50x=False)
381 if result is not None:
382 return result
383 raise MappingError('Unable to connect to server %s' % last_url)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000386def check_files_exist_on_server(query_url, queries):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000387 """Queries the server to see which files from this batch already exist there.
388
389 Arguments:
390 queries: The hash files to potential upload to the server.
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000391 Returns:
392 missing_files: list of files that are missing on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000393 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000394 # TODO(maruel): Move inside IsolateServer.
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000395 logging.info('Checking existence of %d files...', len(queries))
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000396 body = ''.join(
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000397 (binascii.unhexlify(meta_data['h']) for (_, meta_data) in queries))
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000398 assert (len(body) % 20) == 0, repr(body)
399
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000400 response = url_read(
401 query_url, data=body, content_type='application/octet-stream')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000402 if len(queries) != len(response):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000403 raise MappingError(
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000404 'Got an incorrect number of responses from the server. Expected %d, '
405 'but got %d' % (len(queries), len(response)))
406
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000407 missing_files = [
408 queries[i] for i, flag in enumerate(response) if flag == chr(0)
409 ]
410 logging.info('Queried %d files, %d cache hit',
411 len(queries), len(queries) - len(missing_files))
412 return missing_files
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000413
414
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000415class FileSystem(object):
416 """Fetches data from the file system.
417
418 The common use case is a NFS/CIFS file server that is mounted locally that is
419 used to fetch the file on a local partition.
420 """
421 def __init__(self, base_path):
422 self.base_path = base_path
423
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000424 def fetch(self, item, size):
425 source = os.path.join(self.base_path, item)
426 if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size):
427 raise IOError('Invalid file %s' % item)
428 with open(source, 'rb') as f:
429 return [f.read()]
430
431 def retrieve(self, item, dest, size):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000432 source = os.path.join(self.base_path, item)
433 if source == dest:
434 logging.info('Source and destination are the same, no action required')
435 return
436 logging.debug('copy_file(%s, %s)', source, dest)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000437 if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size):
438 raise IOError(
439 'Invalid file %s, %d != %d' % (item, os.stat(source).st_size, size))
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000440 shutil.copy(source, dest)
441
442 def store(self, content, hash_key):
443 raise NotImplementedError()
444
445
446def get_storage_api(file_or_url, namespace):
447 """Returns an object that implements .retrieve()."""
448 if re.match(r'^https?://.+$', file_or_url):
449 return IsolateServer(file_or_url, namespace)
450 else:
451 return FileSystem(file_or_url)
452
453
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000454class WorkerPool(threading_utils.AutoRetryThreadPool):
455 """Thread pool that automatically retries on IOError and runs a preconfigured
456 function.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000457 """
458 # Initial and maximum number of worker threads.
459 INITIAL_WORKERS = 2
460 MAX_WORKERS = 16
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000461 RETRIES = 5
462
463 def __init__(self, do_item):
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000464 super(WorkerPool, self).__init__(
465 [IOError],
466 self.RETRIES,
467 self.INITIAL_WORKERS,
468 self.MAX_WORKERS,
469 0,
470 'remote')
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000471
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000472 # Have .join() always returns the keys, i.e. the first argument for each
473 # task.
474 def run(*args, **kwargs):
475 do_item(*args, **kwargs)
476 return args[0]
477 self._do_item = run
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000478
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000479 def add_item(self, priority, *args, **kwargs):
480 """Adds task to call do_item(*args, **kwargs)."""
481 return self.add_task(priority, self._do_item, *args, **kwargs)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000482
483
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000484def compression_level(filename):
485 """Given a filename calculates the ideal compression level to use."""
486 file_ext = os.path.splitext(filename)[1].lower()
487 # TODO(csharp): Profile to find what compression level works best.
488 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
489
490
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000491def read_and_compress(filepath, level):
492 """Reads a file and returns its content gzip compressed."""
493 compressor = zlib.compressobj(level)
494 compressed_data = cStringIO.StringIO()
495 with open(filepath, 'rb') as f:
496 while True:
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000497 chunk = f.read(ZIPPED_FILE_CHUNK)
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000498 if not chunk:
499 break
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000500 compressed_data.write(compressor.compress(chunk))
501 compressed_data.write(compressor.flush(zlib.Z_FINISH))
502 value = compressed_data.getvalue()
503 compressed_data.close()
504 return value
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000505
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000506
507def zip_and_trigger_upload(infile, metadata, upload_function):
508 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
509 # if not metadata['T']:
510 compressed_data = read_and_compress(infile, compression_level(infile))
511 priority = (
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000512 WorkerPool.HIGH if metadata.get('priority', '1') == '0'
513 else WorkerPool.MED)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000514 return upload_function(
515 priority, compressed_data, metadata['h'], UNKNOWN_FILE_SIZE)
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000516
517
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000518def batch_files_for_check(infiles):
519 """Splits list of files to check for existence on the server into batches.
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000520
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000521 Each batch corresponds to a single 'exists?' query to the server.
522
523 Yields:
524 batches: list of batches, each batch is a list of files.
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000525 """
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +0000526 batch_count = 0
527 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000528 next_queries = []
csharp@chromium.org90c45812013-01-23 14:27:21 +0000529 items = ((k, v) for k, v in infiles.iteritems() if 's' in v)
530 for relfile, metadata in sorted(items, key=lambda x: -x[1]['s']):
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000531 next_queries.append((relfile, metadata))
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +0000532 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000533 yield next_queries
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000534 next_queries = []
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +0000535 batch_count += 1
536 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
537 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000538 if next_queries:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000539 yield next_queries
540
541
542def get_files_to_upload(contains_hash_url, infiles):
543 """Yields files that are missing on the server."""
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +0000544 with threading_utils.ThreadPool(1, 16, 0, prefix='get_files_to_upload') as tp:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000545 for files in batch_files_for_check(infiles):
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +0000546 tp.add_task(0, check_files_exist_on_server, contains_hash_url, files)
547 for missing_file in itertools.chain.from_iterable(tp.iter_results()):
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000548 yield missing_file
maruel@chromium.org35fc0c82013-01-17 15:14:14 +0000549
550
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000551def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000552 """Uploads the given tree to the given url.
553
554 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000555 base_url: The base url, it is assume that |base_url|/has/ can be used to
556 query if an element was already uploaded, and |base_url|/store/
557 can be used to upload a new element.
558 indir: Root directory the infiles are based in.
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000559 infiles: dict of files to upload files from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000560 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000561 """
562 logging.info('upload tree(base_url=%s, indir=%s, files=%d)' %
563 (base_url, indir, len(infiles)))
maruel@chromium.org034e3962013-03-13 13:34:25 +0000564
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000565 # Create a pool of workers to zip and upload any files missing from
566 # the server.
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +0000567 num_threads = threading_utils.num_processors()
568 zipping_pool = threading_utils.ThreadPool(min(2, num_threads),
569 num_threads, 0, 'zip')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000570 remote = IsolateServer(base_url, namespace)
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000571 with WorkerPool(remote.store) as remote_uploader:
572 # Starts the zip and upload process for files that are missing
573 # from the server.
574 contains_hash_url = '%scontains/%s?token=%s' % (
575 remote.content_url, namespace, remote.token)
576 uploaded = []
577 for relfile, metadata in get_files_to_upload(contains_hash_url, infiles):
578 infile = os.path.join(indir, relfile)
579 zipping_pool.add_task(0, zip_and_trigger_upload, infile, metadata,
580 remote_uploader.add_item)
581 uploaded.append((relfile, metadata))
csharp@chromium.org07fa7592013-01-11 18:19:30 +0000582
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000583 logging.info('Waiting for all files to finish zipping')
584 zipping_pool.join()
585 zipping_pool.close()
586 logging.info('All files zipped.')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000587
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000588 logging.info('Waiting for all files to finish uploading')
589 # Will raise if any exception occurred.
590 remote_uploader.join()
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000591 logging.info('All files are uploaded')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000592
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000593 total = len(infiles)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000594 total_size = sum(metadata.get('s', 0) for metadata in infiles.itervalues())
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000595 logging.info(
596 'Total: %6d, %9.1fkb',
597 total,
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000598 sum(m.get('s', 0) for m in infiles.itervalues()) / 1024.)
csharp@chromium.org20a888c2013-01-15 15:06:55 +0000599 cache_hit = set(infiles.iterkeys()) - set(x[0] for x in uploaded)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000600 cache_hit_size = sum(infiles[i].get('s', 0) for i in cache_hit)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000601 logging.info(
602 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
603 len(cache_hit),
604 cache_hit_size / 1024.,
605 len(cache_hit) * 100. / total,
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000606 cache_hit_size * 100. / total_size if total_size else 0)
csharp@chromium.org20a888c2013-01-15 15:06:55 +0000607 cache_miss = uploaded
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000608 cache_miss_size = sum(infiles[i[0]].get('s', 0) for i in cache_miss)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000609 logging.info(
610 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
611 len(cache_miss),
612 cache_miss_size / 1024.,
613 len(cache_miss) * 100. / total,
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000614 cache_miss_size * 100. / total_size if total_size else 0)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000615 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000616
617
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000618@subcommand.usage('<file1..fileN> or - to read from stdin')
619def CMDarchive(parser, args):
620 """Archives data to the server."""
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000621 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000622
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000623 if files == ['-']:
624 files = sys.stdin.readlines()
625
626 if not files:
627 parser.error('Nothing to upload')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000628 if not options.isolate_server:
629 parser.error('Nowhere to send. Please specify --isolate-server')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000630
631 # Load the necessary metadata. This is going to be rewritten eventually to be
632 # more efficient.
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000633 algo = hashlib.sha1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000634 infiles = dict(
635 (
636 f,
637 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000638 's': os.stat(f).st_size,
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000639 'h': hash_file(f, algo),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000640 }
641 )
642 for f in files)
643
vadimsh@chromium.orga4326472013-08-24 02:05:41 +0000644 with tools.Profiler('Archive'):
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000645 ret = upload_tree(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000646 base_url=options.isolate_server,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000647 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000648 infiles=infiles,
649 namespace=options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000650 if not ret:
651 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
652 return ret
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000653
654
655def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000656 """Download data from the server.
657
658 It can download individual files.
659 """
660 parser.add_option(
661 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
662 help='hash and destination of a file, can be used multiple times')
663 parser.add_option(
664 '-t', '--target', metavar='DIR', default=os.getcwd(),
665 help='destination directory')
666 options, args = parser.parse_args(args)
667 if args:
668 parser.error('Unsupported arguments: %s' % args)
669 if not options.file:
670 parser.error('Use one of --file is required.')
671
672 options.target = os.path.abspath(options.target)
673 remote = IsolateServer(options.isolate_server, options.namespace)
674 for h, dest in options.file:
675 logging.info('%s: %s', h, dest)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000676 remote.retrieve(h, os.path.join(options.target, dest), UNKNOWN_FILE_SIZE)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000677 return 0
678
679
680class OptionParserIsolateServer(tools.OptionParserWithLogging):
681 def __init__(self, **kwargs):
682 tools.OptionParserWithLogging.__init__(self, **kwargs)
683 self.add_option(
684 '-I', '--isolate-server',
685 default=ISOLATE_SERVER,
686 metavar='URL',
687 help='Isolate server where data is stored. default: %default')
688 self.add_option(
689 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000690 help='The namespace to use on the server, default: %default')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000691
692 def parse_args(self, *args, **kwargs):
693 options, args = tools.OptionParserWithLogging.parse_args(
694 self, *args, **kwargs)
695 options.isolate_server = options.isolate_server.rstrip('/')
696 if not options.isolate_server:
697 self.error('--isolate-server is required.')
698 return options, args
699
700
701def main(args):
702 dispatcher = subcommand.CommandDispatcher(__name__)
703 try:
704 return dispatcher.execute(
705 OptionParserIsolateServer(version=__version__), args)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000706 except (ConfigError, MappingError) as e:
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000707 sys.stderr.write('\nError: ')
708 sys.stderr.write(str(e))
709 sys.stderr.write('\n')
710 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000711
712
713if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +0000714 fix_encoding.fix_encoding()
715 tools.disable_buffering()
716 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000717 sys.exit(main(sys.argv[1:]))