blob: 955bf9c3ae8cb0b61849a0c25b73b3e0bc7194aa [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
maruele154f9c2015-09-14 11:03:15 -07008__version__ = '0.4.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050018import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050021import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040031from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040032from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000033from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040034from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000035from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000036from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000037
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080038import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040039import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080040
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000042# Version of isolate protocol passed to the server in /handshake request.
43ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000044
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045
Vadim Shtayura3148e072014-09-02 18:51:52 -070046# The file size to be used when we don't know the correct file size,
47# generally used for .isolated files.
48UNKNOWN_FILE_SIZE = None
49
50
51# Maximum expected delay (in seconds) between successive file fetches or uploads
52# in Storage. If it takes longer than that, a deadlock might be happening
53# and all stack frames for all threads are dumped to log.
54DEADLOCK_TIMEOUT = 5 * 60
55
56
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000057# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000058# All files are sorted by likelihood of a change in the file content
59# (currently file size is used to estimate this: larger the file -> larger the
60# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000061# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000062# and so on. Numbers here is a trade-off; the more per request, the lower the
63# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
64# larger values cause longer lookups, increasing the initial latency to start
65# uploading, which is especially an issue for large files. This value is
66# optimized for the "few thousands files to look up with minimal number of large
67# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040068ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000069
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000070
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000071# A list of already compressed extension types that should not receive any
72# compression before being uploaded.
73ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040074 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
75 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000076]
77
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000078
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000079# Chunk size to use when reading from network stream.
80NET_IO_FILE_CHUNK = 16 * 1024
81
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000082
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000083# Read timeout in seconds for downloads from isolate storage. If there's no
84# response from the server within this timeout whole download will be aborted.
85DOWNLOAD_READ_TIMEOUT = 60
86
87
maruel@chromium.org41601642013-09-18 19:40:46 +000088# The delay (in seconds) to wait between logging statements when retrieving
89# the required files. This is intended to let the user (or buildbot) know that
90# the program is still running.
91DELAY_BETWEEN_UPDATES_IN_SECS = 30
92
93
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050094DEFAULT_BLACKLIST = (
95 # Temporary vim or python files.
96 r'^.+\.(?:pyc|swp)$',
97 # .git or .svn directory.
98 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
99)
100
101
Vadim Shtayura8623c272014-12-01 11:45:27 -0800102# A class to use to communicate with the server by default. Can be changed by
103# 'set_storage_api_class'. Default is IsolateServer.
104_storage_api_cls = None
105
106
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500107class Error(Exception):
108 """Generic runtime error."""
109 pass
110
111
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400112class Aborted(Error):
113 """Operation aborted."""
114 pass
115
116
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000117def stream_read(stream, chunk_size):
118 """Reads chunks from |stream| and yields them."""
119 while True:
120 data = stream.read(chunk_size)
121 if not data:
122 break
123 yield data
124
125
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400126def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800127 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000128 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800129 if offset:
130 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000131 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000132 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 if not data:
134 break
135 yield data
136
137
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138def file_write(filepath, content_generator):
139 """Writes file content as generated by content_generator.
140
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000141 Creates the intermediary directory as needed.
142
143 Returns the number of bytes written.
144
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000145 Meant to be mocked out in unit tests.
146 """
147 filedir = os.path.dirname(filepath)
148 if not os.path.isdir(filedir):
149 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000150 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151 with open(filepath, 'wb') as f:
152 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000153 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000154 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000155 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000156
157
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000158def zip_compress(content_generator, level=7):
159 """Reads chunks from |content_generator| and yields zip compressed chunks."""
160 compressor = zlib.compressobj(level)
161 for chunk in content_generator:
162 compressed = compressor.compress(chunk)
163 if compressed:
164 yield compressed
165 tail = compressor.flush(zlib.Z_FINISH)
166 if tail:
167 yield tail
168
169
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400170def zip_decompress(
171 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000172 """Reads zipped data from |content_generator| and yields decompressed data.
173
174 Decompresses data in small chunks (no larger than |chunk_size|) so that
175 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
176
177 Raises IOError if data is corrupted or incomplete.
178 """
179 decompressor = zlib.decompressobj()
180 compressed_size = 0
181 try:
182 for chunk in content_generator:
183 compressed_size += len(chunk)
184 data = decompressor.decompress(chunk, chunk_size)
185 if data:
186 yield data
187 while decompressor.unconsumed_tail:
188 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
189 if data:
190 yield data
191 tail = decompressor.flush()
192 if tail:
193 yield tail
194 except zlib.error as e:
195 raise IOError(
196 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
197 # Ensure all data was read and decompressed.
198 if decompressor.unused_data or decompressor.unconsumed_tail:
199 raise IOError('Not all data was decompressed')
200
201
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000202def get_zip_compression_level(filename):
203 """Given a filename calculates the ideal zip compression level to use."""
204 file_ext = os.path.splitext(filename)[1].lower()
205 # TODO(csharp): Profile to find what compression level works best.
206 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
207
208
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000209def create_directories(base_directory, files):
210 """Creates the directory structure needed by the given list of files."""
211 logging.debug('create_directories(%s, %d)', base_directory, len(files))
212 # Creates the tree of directories to create.
213 directories = set(os.path.dirname(f) for f in files)
214 for item in list(directories):
215 while item:
216 directories.add(item)
217 item = os.path.dirname(item)
218 for d in sorted(directories):
219 if d:
220 os.mkdir(os.path.join(base_directory, d))
221
222
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500223def create_symlinks(base_directory, files):
224 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000225 for filepath, properties in files:
226 if 'l' not in properties:
227 continue
228 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500229 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000230 logging.warning('Ignoring symlink %s', filepath)
231 continue
232 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500233 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000234 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000235
236
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000237def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000238 """Determines if the given files appears valid.
239
240 Currently it just checks the file's size.
241 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700242 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000243 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000244 actual_size = os.stat(filepath).st_size
245 if size != actual_size:
246 logging.warning(
247 'Found invalid item %s; %d != %d',
248 os.path.basename(filepath), actual_size, size)
249 return False
250 return True
251
252
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000253class Item(object):
254 """An item to push to Storage.
255
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800256 Its digest and size may be provided in advance, if known. Otherwise they will
257 be derived from content(). If digest is provided, it MUST correspond to
258 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000259
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800260 When used with Storage, Item starts its life in a main thread, travels
261 to 'contains' thread, then to 'push' thread and then finally back to
262 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000263 """
264
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800265 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000266 self.digest = digest
267 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800268 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000269 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000270
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800271 def content(self):
272 """Iterable with content of this item as byte string (str) chunks."""
273 raise NotImplementedError()
274
275 def prepare(self, hash_algo):
276 """Ensures self.digest and self.size are set.
277
278 Uses content() as a source of data to calculate them. Does nothing if digest
279 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000280
281 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800282 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000283 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800284 if self.digest is None or self.size is None:
285 digest = hash_algo()
286 total = 0
287 for chunk in self.content():
288 digest.update(chunk)
289 total += len(chunk)
290 self.digest = digest.hexdigest()
291 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000292
293
294class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800295 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000296
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800297 Its digest and size may be provided in advance, if known. Otherwise they will
298 be derived from the file content.
299 """
300
301 def __init__(self, path, digest=None, size=None, high_priority=False):
302 super(FileItem, self).__init__(
303 digest,
304 size if size is not None else os.stat(path).st_size,
305 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000306 self.path = path
307 self.compression_level = get_zip_compression_level(path)
308
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800309 def content(self):
310 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000311
312
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000313class BufferItem(Item):
314 """A byte buffer to push to Storage."""
315
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800316 def __init__(self, buf, high_priority=False):
317 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000318 self.buffer = buf
319
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800320 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000321 return [self.buffer]
322
323
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000324class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800325 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000326
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800327 Implements compression support, parallel 'contains' checks, parallel uploads
328 and more.
329
330 Works only within single namespace (and thus hashing algorithm and compression
331 scheme are fixed).
332
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400333 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
334 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800335 """
336
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700337 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000338 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400339 self._use_zip = isolated_format.is_namespace_with_compression(
340 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400341 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000342 self._cpu_thread_pool = None
343 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400344 self._aborted = False
345 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000346
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000347 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700348 def hash_algo(self):
349 """Hashing algorithm used to name files in storage based on their content.
350
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400351 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700352 """
353 return self._hash_algo
354
355 @property
356 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500357 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700358 return self._storage_api.location
359
360 @property
361 def namespace(self):
362 """Isolate namespace used by this storage.
363
364 Indirectly defines hashing scheme and compression method used.
365 """
366 return self._storage_api.namespace
367
368 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000369 def cpu_thread_pool(self):
370 """ThreadPool for CPU-bound tasks like zipping."""
371 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500372 threads = max(threading_utils.num_processors(), 2)
373 if sys.maxsize <= 2L**32:
374 # On 32 bits userland, do not try to use more than 16 threads.
375 threads = min(threads, 16)
376 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000377 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000378
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000379 @property
380 def net_thread_pool(self):
381 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
382 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700383 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000384 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000385
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000386 def close(self):
387 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400388 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000389 if self._cpu_thread_pool:
390 self._cpu_thread_pool.join()
391 self._cpu_thread_pool.close()
392 self._cpu_thread_pool = None
393 if self._net_thread_pool:
394 self._net_thread_pool.join()
395 self._net_thread_pool.close()
396 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400397 logging.info('Done.')
398
399 def abort(self):
400 """Cancels any pending or future operations."""
401 # This is not strictly theadsafe, but in the worst case the logging message
402 # will be printed twice. Not a big deal. In other places it is assumed that
403 # unprotected reads and writes to _aborted are serializable (it is true
404 # for python) and thus no locking is used.
405 if not self._aborted:
406 logging.warning('Aborting... It can take a while.')
407 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000408
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000409 def __enter__(self):
410 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400411 assert not self._prev_sig_handlers, self._prev_sig_handlers
412 for s in (signal.SIGINT, signal.SIGTERM):
413 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000414 return self
415
416 def __exit__(self, _exc_type, _exc_value, _traceback):
417 """Context manager interface."""
418 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400419 while self._prev_sig_handlers:
420 s, h = self._prev_sig_handlers.popitem()
421 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000422 return False
423
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000424 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800425 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000426
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800427 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000428
429 Arguments:
430 items: list of Item instances that represents data to upload.
431
432 Returns:
433 List of items that were uploaded. All other items are already there.
434 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700435 logging.info('upload_items(items=%d)', len(items))
436
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800437 # Ensure all digests are calculated.
438 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700439 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800440
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000441 # For each digest keep only first Item that matches it. All other items
442 # are just indistinguishable copies from the point of view of isolate
443 # server (it doesn't care about paths at all, only content and digests).
444 seen = {}
445 duplicates = 0
446 for item in items:
447 if seen.setdefault(item.digest, item) is not item:
448 duplicates += 1
449 items = seen.values()
450 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700451 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000452
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000453 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000454 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000455 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800456 channel = threading_utils.TaskChannel()
457 for missing_item, push_state in self.get_missing_items(items):
458 missing.add(missing_item)
459 self.async_push(channel, missing_item, push_state)
460
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 # No need to spawn deadlock detector thread if there's nothing to upload.
462 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700463 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000465 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000466 detector.ping()
467 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000468 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000469 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000470 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000471 logging.info('All files are uploaded')
472
473 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000474 total = len(items)
475 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000476 logging.info(
477 'Total: %6d, %9.1fkb',
478 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000479 total_size / 1024.)
480 cache_hit = set(items) - missing
481 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 logging.info(
483 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
484 len(cache_hit),
485 cache_hit_size / 1024.,
486 len(cache_hit) * 100. / total,
487 cache_hit_size * 100. / total_size if total_size else 0)
488 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000489 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000490 logging.info(
491 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
492 len(cache_miss),
493 cache_miss_size / 1024.,
494 len(cache_miss) * 100. / total,
495 cache_miss_size * 100. / total_size if total_size else 0)
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 return uploaded
498
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800499 def get_fetch_url(self, item):
500 """Returns an URL that can be used to fetch given item once it's uploaded.
501
502 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000503
504 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800505 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000506
507 Returns:
508 An URL or None if underlying protocol doesn't support this.
509 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700510 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000512
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000514 """Starts asynchronous push to the server in a parallel thread.
515
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800516 Can be used only after |item| was checked for presence on a server with
517 'get_missing_items' call. 'get_missing_items' returns |push_state| object
518 that contains storage specific information describing how to upload
519 the item (for example in case of cloud storage, it is signed upload URLs).
520
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000521 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000522 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000523 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800524 push_state: push state returned by 'get_missing_items' call for |item|.
525
526 Returns:
527 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000528 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400530 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700531 threading_utils.PRIORITY_HIGH if item.high_priority
532 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800533
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000534 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400535 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400536 if self._aborted:
537 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700538 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800539 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540 return item
541
542 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700543 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800544 self.net_thread_pool.add_task_with_channel(
545 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000546 return
547
548 # If zipping is enabled, zip in a separate thread.
549 def zip_and_push():
550 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
551 # content right here. It will block until all file is zipped.
552 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400553 if self._aborted:
554 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800555 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000556 data = ''.join(stream)
557 except Exception as exc:
558 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800559 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000560 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000561 self.net_thread_pool.add_task_with_channel(
562 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000563 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000564
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800565 def push(self, item, push_state):
566 """Synchronously pushes a single item to the server.
567
568 If you need to push many items at once, consider using 'upload_items' or
569 'async_push' with instance of TaskChannel.
570
571 Arguments:
572 item: item to upload as instance of Item class.
573 push_state: push state returned by 'get_missing_items' call for |item|.
574
575 Returns:
576 Pushed item (same object as |item|).
577 """
578 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700579 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800580 self.async_push(channel, item, push_state)
581 pushed = channel.pull()
582 assert pushed is item
583 return item
584
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000585 def async_fetch(self, channel, priority, digest, size, sink):
586 """Starts asynchronous fetch from the server in a parallel thread.
587
588 Arguments:
589 channel: TaskChannel that receives back |digest| when download ends.
590 priority: thread pool task priority for the fetch.
591 digest: hex digest of an item to download.
592 size: expected size of the item (after decompression).
593 sink: function that will be called as sink(generator).
594 """
595 def fetch():
596 try:
597 # Prepare reading pipeline.
598 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700599 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400600 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000601 # Run |stream| through verifier that will assert its size.
602 verifier = FetchStreamVerifier(stream, size)
603 # Verified stream goes to |sink|.
604 sink(verifier.run())
605 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800606 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000607 raise
608 return digest
609
610 # Don't bother with zip_thread_pool for decompression. Decompression is
611 # really fast and most probably IO bound anyway.
612 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
613
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000614 def get_missing_items(self, items):
615 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000617 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000618
619 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000620 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000621
622 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800623 For each missing item it yields a pair (item, push_state), where:
624 * item - Item object that is missing (one of |items|).
625 * push_state - opaque object that contains storage specific information
626 describing how to upload the item (for example in case of cloud
627 storage, it is signed upload URLs). It can later be passed to
628 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000629 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000630 channel = threading_utils.TaskChannel()
631 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800632
633 # Ensure all digests are calculated.
634 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700635 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800636
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400637 def contains(batch):
638 if self._aborted:
639 raise Aborted()
640 return self._storage_api.contains(batch)
641
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000642 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800643 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400644 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400645 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000646 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800647
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000648 # Yield results as they come in.
649 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800650 for missing_item, push_state in channel.pull().iteritems():
651 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000652
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000653
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800654def batch_items_for_check(items):
655 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000656
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800657 Each batch corresponds to a single 'exists?' query to the server via a call
658 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000659
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800660 Arguments:
661 items: a list of Item objects.
662
663 Yields:
664 Batches of items to query for existence in a single operation,
665 each batch is a list of Item objects.
666 """
667 batch_count = 0
668 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
669 next_queries = []
670 for item in sorted(items, key=lambda x: x.size, reverse=True):
671 next_queries.append(item)
672 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000673 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800674 next_queries = []
675 batch_count += 1
676 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
677 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
678 if next_queries:
679 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000682class FetchQueue(object):
683 """Fetches items from Storage and places them into LocalCache.
684
685 It manages multiple concurrent fetch operations. Acts as a bridge between
686 Storage and LocalCache so that Storage and LocalCache don't depend on each
687 other at all.
688 """
689
690 def __init__(self, storage, cache):
691 self.storage = storage
692 self.cache = cache
693 self._channel = threading_utils.TaskChannel()
694 self._pending = set()
695 self._accessed = set()
696 self._fetched = cache.cached_set()
697
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400698 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700699 self,
700 digest,
701 size=UNKNOWN_FILE_SIZE,
702 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000703 """Starts asynchronous fetch of item |digest|."""
704 # Fetching it now?
705 if digest in self._pending:
706 return
707
708 # Mark this file as in use, verify_all_cached will later ensure it is still
709 # in cache.
710 self._accessed.add(digest)
711
712 # Already fetched? Notify cache to update item's LRU position.
713 if digest in self._fetched:
714 # 'touch' returns True if item is in cache and not corrupted.
715 if self.cache.touch(digest, size):
716 return
717 # Item is corrupted, remove it from cache and fetch it again.
718 self._fetched.remove(digest)
719 self.cache.evict(digest)
720
721 # TODO(maruel): It should look at the free disk space, the current cache
722 # size and the size of the new item on every new item:
723 # - Trim the cache as more entries are listed when free disk space is low,
724 # otherwise if the amount of data downloaded during the run > free disk
725 # space, it'll crash.
726 # - Make sure there's enough free disk space to fit all dependencies of
727 # this run! If not, abort early.
728
729 # Start fetching.
730 self._pending.add(digest)
731 self.storage.async_fetch(
732 self._channel, priority, digest, size,
733 functools.partial(self.cache.write, digest))
734
735 def wait(self, digests):
736 """Starts a loop that waits for at least one of |digests| to be retrieved.
737
738 Returns the first digest retrieved.
739 """
740 # Flush any already fetched items.
741 for digest in digests:
742 if digest in self._fetched:
743 return digest
744
745 # Ensure all requested items are being fetched now.
746 assert all(digest in self._pending for digest in digests), (
747 digests, self._pending)
748
749 # Wait for some requested item to finish fetching.
750 while self._pending:
751 digest = self._channel.pull()
752 self._pending.remove(digest)
753 self._fetched.add(digest)
754 if digest in digests:
755 return digest
756
757 # Should never reach this point due to assert above.
758 raise RuntimeError('Impossible state')
759
760 def inject_local_file(self, path, algo):
761 """Adds local file to the cache as if it was fetched from storage."""
762 with open(path, 'rb') as f:
763 data = f.read()
764 digest = algo(data).hexdigest()
765 self.cache.write(digest, [data])
766 self._fetched.add(digest)
767 return digest
768
769 @property
770 def pending_count(self):
771 """Returns number of items to be fetched."""
772 return len(self._pending)
773
774 def verify_all_cached(self):
775 """True if all accessed items are in cache."""
776 return self._accessed.issubset(self.cache.cached_set())
777
778
779class FetchStreamVerifier(object):
780 """Verifies that fetched file is valid before passing it to the LocalCache."""
781
782 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400783 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000784 self.stream = stream
785 self.expected_size = expected_size
786 self.current_size = 0
787
788 def run(self):
789 """Generator that yields same items as |stream|.
790
791 Verifies |stream| is complete before yielding a last chunk to consumer.
792
793 Also wraps IOError produced by consumer into MappingError exceptions since
794 otherwise Storage will retry fetch on unrelated local cache errors.
795 """
796 # Read one chunk ahead, keep it in |stored|.
797 # That way a complete stream can be verified before pushing last chunk
798 # to consumer.
799 stored = None
800 for chunk in self.stream:
801 assert chunk is not None
802 if stored is not None:
803 self._inspect_chunk(stored, is_last=False)
804 try:
805 yield stored
806 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400807 raise isolated_format.MappingError(
808 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000809 stored = chunk
810 if stored is not None:
811 self._inspect_chunk(stored, is_last=True)
812 try:
813 yield stored
814 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400815 raise isolated_format.MappingError(
816 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000817
818 def _inspect_chunk(self, chunk, is_last):
819 """Called for each fetched chunk before passing it to consumer."""
820 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400821 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700822 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000823 (self.expected_size != self.current_size)):
824 raise IOError('Incorrect file size: expected %d, got %d' % (
825 self.expected_size, self.current_size))
826
827
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000828class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800829 """Interface for classes that implement low-level storage operations.
830
831 StorageApi is oblivious of compression and hashing scheme used. This details
832 are handled in higher level Storage class.
833
834 Clients should generally not use StorageApi directly. Storage class is
835 preferred since it implements compression and upload optimizations.
836 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000837
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700838 @property
839 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500840 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700841 raise NotImplementedError()
842
843 @property
844 def namespace(self):
845 """Isolate namespace used by this storage.
846
847 Indirectly defines hashing scheme and compression method used.
848 """
849 raise NotImplementedError()
850
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000851 def get_fetch_url(self, digest):
852 """Returns an URL that can be used to fetch an item with given digest.
853
854 Arguments:
855 digest: hex digest of item to fetch.
856
857 Returns:
858 An URL or None if the protocol doesn't support this.
859 """
860 raise NotImplementedError()
861
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800862 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000863 """Fetches an object and yields its content.
864
865 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000866 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800867 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000868
869 Yields:
870 Chunks of downloaded item (as str objects).
871 """
872 raise NotImplementedError()
873
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800874 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000875 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000876
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800877 |item| MUST go through 'contains' call to get |push_state| before it can
878 be pushed to the storage.
879
880 To be clear, here is one possible usage:
881 all_items = [... all items to push as Item subclasses ...]
882 for missing_item, push_state in storage_api.contains(all_items).items():
883 storage_api.push(missing_item, push_state)
884
885 When pushing to a namespace with compression, data that should be pushed
886 and data provided by the item is not the same. In that case |content| is
887 not None and it yields chunks of compressed data (using item.content() as
888 a source of original uncompressed data). This is implemented by Storage
889 class.
890
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000891 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000892 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800893 push_state: push state object as returned by 'contains' call.
894 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000895
896 Returns:
897 None.
898 """
899 raise NotImplementedError()
900
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000901 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800902 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000903
904 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800905 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000906
907 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800908 A dict missing Item -> opaque push state object to be passed to 'push'.
909 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000910 """
911 raise NotImplementedError()
912
913
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800914class _IsolateServerPushState(object):
915 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500916
917 Note this needs to be a global class to support pickling.
918 """
919
Cory Massarocc19c8c2015-03-10 13:35:11 -0700920 def __init__(self, preupload_status, size):
921 self.preupload_status = preupload_status
922 gs_upload_url = preupload_status.get('gs_upload_url') or None
923 if gs_upload_url:
924 self.upload_url = gs_upload_url
925 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
926 else:
927 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
928 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500929 self.uploaded = False
930 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500931 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500932
933
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000934class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000935 """StorageApi implementation that downloads and uploads to Isolate Server.
936
937 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800938 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000939 """
940
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000941 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000942 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500943 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700944 self._base_url = base_url.rstrip('/')
945 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700946 self._namespace_dict = {
947 'compression': 'flate' if namespace.endswith(
948 ('-gzip', '-flate')) else '',
949 'digest_hash': 'sha-1',
950 'namespace': namespace,
951 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000952 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000953 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500954 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000955
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000956 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000957 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700958 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000959
960 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700961 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000962 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000963 # TODO(maruel): Make this request much earlier asynchronously while the
964 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800965
966 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
967 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700968
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000969 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000970 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700971 self._server_caps = net.url_read_json(
972 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
973 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000974 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000975
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700976 @property
977 def location(self):
978 return self._base_url
979
980 @property
981 def namespace(self):
982 return self._namespace
983
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000984 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000985 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000986 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700987 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000988
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800989 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700990 assert offset >= 0
991 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
992 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800993 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700994 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000995
Cory Massarocc19c8c2015-03-10 13:35:11 -0700996 if not response:
maruele154f9c2015-09-14 11:03:15 -0700997 raise IOError(
998 'Attempted to fetch from %s; no data exist: %s / %s.' % (
999 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001000
Cory Massarocc19c8c2015-03-10 13:35:11 -07001001 # for DB uploads
1002 content = response.get('content')
1003 if content is not None:
1004 return base64.b64decode(content)
1005
1006 # for GS entities
1007 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -07001008 if not connection:
1009 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -07001010
1011 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001012 if offset:
1013 content_range = connection.get_header('Content-Range')
1014 if not content_range:
1015 raise IOError('Missing Content-Range header')
1016
1017 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1018 # According to a spec, <size> can be '*' meaning "Total size of the file
1019 # is not known in advance".
1020 try:
1021 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1022 if not match:
1023 raise ValueError()
1024 content_offset = int(match.group(1))
1025 last_byte_index = int(match.group(2))
1026 size = None if match.group(3) == '*' else int(match.group(3))
1027 except ValueError:
1028 raise IOError('Invalid Content-Range header: %s' % content_range)
1029
1030 # Ensure returned offset equals requested one.
1031 if offset != content_offset:
1032 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1033 offset, content_offset, content_range))
1034
1035 # Ensure entire tail of the file is returned.
1036 if size is not None and last_byte_index + 1 != size:
1037 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1038
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001039 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001040
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001041 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001042 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001043 assert item.digest is not None
1044 assert item.size is not None
1045 assert isinstance(push_state, _IsolateServerPushState)
1046 assert not push_state.finalized
1047
1048 # Default to item.content().
1049 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001050 logging.info('Push state size: %d', push_state.size)
1051 if isinstance(content, (basestring, list)):
1052 # Memory is already used, too late.
1053 with self._lock:
1054 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001055 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001056 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1057 # If |content| is indeed a generator, it can not be re-winded back to the
1058 # beginning of the stream. A retry will find it exhausted. A possible
1059 # solution is to wrap |content| generator with some sort of caching
1060 # restartable generator. It should be done alongside streaming support
1061 # implementation.
1062 #
1063 # In theory, we should keep the generator, so that it is not serialized in
1064 # memory. Sadly net.HttpService.request() requires the body to be
1065 # serialized.
1066 assert isinstance(content, types.GeneratorType), repr(content)
1067 slept = False
1068 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001069 # One byte less than 512mb. This is to cope with incompressible content.
1070 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001071 while True:
1072 with self._lock:
1073 # This is due to 32 bits python when uploading very large files. The
1074 # problem is that it's comparing uncompressed sizes, while we care
1075 # about compressed sizes since it's what is serialized in memory.
1076 # The first check assumes large files are compressible and that by
1077 # throttling one upload at once, we can survive. Otherwise, kaboom.
1078 memory_use = self._memory_use
1079 if ((push_state.size >= max_size and not memory_use) or
1080 (memory_use + push_state.size <= max_size)):
1081 self._memory_use += push_state.size
1082 memory_use = self._memory_use
1083 break
1084 time.sleep(0.1)
1085 slept = True
1086 if slept:
1087 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001088
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001089 try:
1090 # This push operation may be a retry after failed finalization call below,
1091 # no need to reupload contents in that case.
1092 if not push_state.uploaded:
1093 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001094 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001095 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001096 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001097 item.digest, push_state.upload_url))
1098 push_state.uploaded = True
1099 else:
1100 logging.info(
1101 'A file %s already uploaded, retrying finalization only',
1102 item.digest)
1103
1104 # Optionally notify the server that it's done.
1105 if push_state.finalize_url:
1106 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1107 # send it to isolated server. That way isolate server can verify that
1108 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1109 # stored files).
1110 # TODO(maruel): Fix the server to accept properly data={} so
1111 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001112 response = net.url_read_json(
1113 url='%s/%s' % (self._base_url, push_state.finalize_url),
1114 data={
1115 'upload_ticket': push_state.preupload_status['upload_ticket'],
1116 })
1117 if not response or not response['ok']:
1118 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001119 push_state.finalized = True
1120 finally:
1121 with self._lock:
1122 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001123
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001124 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001125 # Ensure all items were initialized with 'prepare' call. Storage does that.
1126 assert all(i.digest is not None and i.size is not None for i in items)
1127
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001128 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001129 body = {
1130 'items': [
1131 {
1132 'digest': item.digest,
1133 'is_isolated': bool(item.high_priority),
1134 'size': item.size,
1135 } for item in items
1136 ],
1137 'namespace': self._namespace_dict,
1138 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001139
Cory Massarocc19c8c2015-03-10 13:35:11 -07001140 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001141
1142 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001143 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001144 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001145 response = net.url_read_json(url=query_url, data=body)
1146 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001147 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001148 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001149 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001150 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001151 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001152
1153 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001154 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001155 for preupload_status in response.get('items', []):
1156 assert 'upload_ticket' in preupload_status, (
1157 preupload_status, '/preupload did not generate an upload ticket')
1158 index = int(preupload_status['index'])
1159 missing_items[items[index]] = _IsolateServerPushState(
1160 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001161 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001162 len(items), len(items) - len(missing_items))
1163 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001164
Cory Massarocc19c8c2015-03-10 13:35:11 -07001165 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001166 """Fetches isolated data from the URL.
1167
1168 Used only for fetching files, not for API calls. Can be overridden in
1169 subclasses.
1170
1171 Args:
1172 url: URL to fetch the data from, can possibly return http redirect.
1173 offset: byte offset inside the file to start fetching from.
1174
1175 Returns:
1176 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1177 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001178 assert isinstance(offset, int)
1179 data = {
1180 'digest': digest.encode('utf-8'),
1181 'namespace': self._namespace_dict,
1182 'offset': offset,
1183 }
1184 return net.url_read_json(
1185 url=url,
1186 data=data,
1187 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001188
Cory Massarocc19c8c2015-03-10 13:35:11 -07001189 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001190 """Uploads isolated file to the URL.
1191
1192 Used only for storing files, not for API calls. Can be overridden in
1193 subclasses.
1194
1195 Args:
1196 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001197 push_state: an _IsolateServicePushState instance
1198 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001199 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001200 """
1201 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1202 # upload support is implemented.
1203 if isinstance(content, list) and len(content) == 1:
1204 content = content[0]
1205 else:
1206 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001207
1208 # DB upload
1209 if not push_state.finalize_url:
1210 url = '%s/%s' % (self._base_url, push_state.upload_url)
1211 content = base64.b64encode(content)
1212 data = {
1213 'upload_ticket': push_state.preupload_status['upload_ticket'],
1214 'content': content,
1215 }
1216 response = net.url_read_json(url=url, data=data)
1217 return response is not None and response['ok']
1218
1219 # upload to GS
1220 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001221 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001222 content_type='application/octet-stream',
1223 data=content,
1224 method='PUT',
1225 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001226 return response is not None
1227
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001228
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001229class LocalCache(object):
1230 """Local cache that stores objects fetched via Storage.
1231
1232 It can be accessed concurrently from multiple threads, so it should protect
1233 its internal state with some lock.
1234 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001235 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001236
1237 def __enter__(self):
1238 """Context manager interface."""
1239 return self
1240
1241 def __exit__(self, _exc_type, _exec_value, _traceback):
1242 """Context manager interface."""
1243 return False
1244
1245 def cached_set(self):
1246 """Returns a set of all cached digests (always a new object)."""
1247 raise NotImplementedError()
1248
1249 def touch(self, digest, size):
1250 """Ensures item is not corrupted and updates its LRU position.
1251
1252 Arguments:
1253 digest: hash digest of item to check.
1254 size: expected size of this item.
1255
1256 Returns:
1257 True if item is in cache and not corrupted.
1258 """
1259 raise NotImplementedError()
1260
1261 def evict(self, digest):
1262 """Removes item from cache if it's there."""
1263 raise NotImplementedError()
1264
1265 def read(self, digest):
1266 """Returns contents of the cached item as a single str."""
1267 raise NotImplementedError()
1268
1269 def write(self, digest, content):
1270 """Reads data from |content| generator and stores it in cache."""
1271 raise NotImplementedError()
1272
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001273 def hardlink(self, digest, dest, file_mode):
1274 """Ensures file at |dest| has same content as cached |digest|.
1275
1276 If file_mode is provided, it is used to set the executable bit if
1277 applicable.
1278 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001279 raise NotImplementedError()
1280
1281
1282class MemoryCache(LocalCache):
1283 """LocalCache implementation that stores everything in memory."""
1284
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001285 def __init__(self, file_mode_mask=0500):
1286 """Args:
1287 file_mode_mask: bit mask to AND file mode with. Default value will make
1288 all mapped files to be read only.
1289 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001290 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001291 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001292 # Let's not assume dict is thread safe.
1293 self._lock = threading.Lock()
1294 self._contents = {}
1295
1296 def cached_set(self):
1297 with self._lock:
1298 return set(self._contents)
1299
1300 def touch(self, digest, size):
1301 with self._lock:
1302 return digest in self._contents
1303
1304 def evict(self, digest):
1305 with self._lock:
1306 self._contents.pop(digest, None)
1307
1308 def read(self, digest):
1309 with self._lock:
1310 return self._contents[digest]
1311
1312 def write(self, digest, content):
1313 # Assemble whole stream before taking the lock.
1314 data = ''.join(content)
1315 with self._lock:
1316 self._contents[digest] = data
1317
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001318 def hardlink(self, digest, dest, file_mode):
1319 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001320 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001321 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001322 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001323
1324
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001325class CachePolicies(object):
1326 def __init__(self, max_cache_size, min_free_space, max_items):
1327 """
1328 Arguments:
1329 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1330 cache is effectively a leak.
1331 - min_free_space: Trim if disk free space becomes lower than this value. If
1332 0, it unconditionally fill the disk.
1333 - max_items: Maximum number of items to keep in the cache. If 0, do not
1334 enforce a limit.
1335 """
1336 self.max_cache_size = max_cache_size
1337 self.min_free_space = min_free_space
1338 self.max_items = max_items
1339
1340
1341class DiskCache(LocalCache):
1342 """Stateful LRU cache in a flat hash table in a directory.
1343
1344 Saves its state as json file.
1345 """
1346 STATE_FILE = 'state.json'
1347
1348 def __init__(self, cache_dir, policies, hash_algo):
1349 """
1350 Arguments:
1351 cache_dir: directory where to place the cache.
1352 policies: cache retention policies.
1353 algo: hashing algorithm used.
1354 """
1355 super(DiskCache, self).__init__()
1356 self.cache_dir = cache_dir
1357 self.policies = policies
1358 self.hash_algo = hash_algo
1359 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1360
1361 # All protected methods (starting with '_') except _path should be called
1362 # with this lock locked.
1363 self._lock = threading_utils.LockWithAssert()
1364 self._lru = lru.LRUDict()
1365
1366 # Profiling values.
1367 self._added = []
1368 self._removed = []
1369 self._free_disk = 0
1370
1371 with tools.Profiler('Setup'):
1372 with self._lock:
1373 self._load()
1374
1375 def __enter__(self):
1376 return self
1377
1378 def __exit__(self, _exc_type, _exec_value, _traceback):
1379 with tools.Profiler('CleanupTrimming'):
1380 with self._lock:
1381 self._trim()
1382
1383 logging.info(
1384 '%5d (%8dkb) added',
1385 len(self._added), sum(self._added) / 1024)
1386 logging.info(
1387 '%5d (%8dkb) current',
1388 len(self._lru),
1389 sum(self._lru.itervalues()) / 1024)
1390 logging.info(
1391 '%5d (%8dkb) removed',
1392 len(self._removed), sum(self._removed) / 1024)
1393 logging.info(
1394 ' %8dkb free',
1395 self._free_disk / 1024)
1396 return False
1397
1398 def cached_set(self):
1399 with self._lock:
1400 return self._lru.keys_set()
1401
1402 def touch(self, digest, size):
1403 """Verifies an actual file is valid.
1404
1405 Note that is doesn't compute the hash so it could still be corrupted if the
1406 file size didn't change.
1407
1408 TODO(maruel): More stringent verification while keeping the check fast.
1409 """
1410 # Do the check outside the lock.
1411 if not is_valid_file(self._path(digest), size):
1412 return False
1413
1414 # Update it's LRU position.
1415 with self._lock:
1416 if digest not in self._lru:
1417 return False
1418 self._lru.touch(digest)
1419 return True
1420
1421 def evict(self, digest):
1422 with self._lock:
1423 self._lru.pop(digest)
1424 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1425
1426 def read(self, digest):
1427 with open(self._path(digest), 'rb') as f:
1428 return f.read()
1429
1430 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001431 assert content is not None
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001432 path = self._path(digest)
1433 # A stale broken file may remain. It is possible for the file to have write
1434 # access bit removed which would cause the file_write() call to fail to open
1435 # in write mode. Take no chance here.
1436 file_path.try_remove(path)
1437 try:
1438 size = file_write(path, content)
1439 except:
1440 # There are two possible places were an exception can occur:
1441 # 1) Inside |content| generator in case of network or unzipping errors.
1442 # 2) Inside file_write itself in case of disk IO errors.
1443 # In any case delete an incomplete file and propagate the exception to
1444 # caller, it will be logged there.
1445 file_path.try_remove(path)
1446 raise
1447 # Make the file read-only in the cache. This has a few side-effects since
1448 # the file node is modified, so every directory entries to this file becomes
1449 # read-only. It's fine here because it is a new file.
1450 file_path.set_read_only(path, True)
1451 with self._lock:
1452 self._add(digest, size)
1453
1454 def hardlink(self, digest, dest, file_mode):
1455 """Hardlinks the file to |dest|.
1456
1457 Note that the file permission bits are on the file node, not the directory
1458 entry, so changing the access bit on any of the directory entries for the
1459 file node will affect them all.
1460 """
1461 path = self._path(digest)
maruel1f7e8162015-09-16 10:35:43 -07001462 if not file_path.link_file(dest, path, file_path.HARDLINK_WITH_FALLBACK):
1463 # Report to the server that it failed with more details. We'll want to
1464 # squash them all.
1465 on_error.report('Failed to hardlink\n%s -> %s' % (path, dest))
1466
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001467 if file_mode is not None:
1468 # Ignores all other bits.
1469 os.chmod(dest, file_mode & 0500)
1470
1471 def _load(self):
1472 """Loads state of the cache from json file."""
1473 self._lock.assert_locked()
1474
1475 if not os.path.isdir(self.cache_dir):
1476 os.makedirs(self.cache_dir)
1477 else:
1478 # Make sure the cache is read-only.
1479 # TODO(maruel): Calculate the cost and optimize the performance
1480 # accordingly.
1481 file_path.make_tree_read_only(self.cache_dir)
1482
1483 # Load state of the cache.
1484 if os.path.isfile(self.state_file):
1485 try:
1486 self._lru = lru.LRUDict.load(self.state_file)
1487 except ValueError as err:
1488 logging.error('Failed to load cache state: %s' % (err,))
1489 # Don't want to keep broken state file.
1490 file_path.try_remove(self.state_file)
1491
1492 # Ensure that all files listed in the state still exist and add new ones.
1493 previous = self._lru.keys_set()
1494 unknown = []
1495 for filename in os.listdir(self.cache_dir):
1496 if filename == self.STATE_FILE:
1497 continue
1498 if filename in previous:
1499 previous.remove(filename)
1500 continue
1501 # An untracked file.
1502 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1503 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001504 p = self._path(filename)
1505 if os.path.isdir(p):
1506 try:
1507 file_path.rmtree(p)
1508 except OSError:
1509 pass
1510 else:
1511 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001512 continue
1513 # File that's not referenced in 'state.json'.
1514 # TODO(vadimsh): Verify its SHA1 matches file name.
1515 logging.warning('Adding unknown file %s to cache', filename)
1516 unknown.append(filename)
1517
1518 if unknown:
1519 # Add as oldest files. They will be deleted eventually if not accessed.
1520 self._add_oldest_list(unknown)
1521 logging.warning('Added back %d unknown files', len(unknown))
1522
1523 if previous:
1524 # Filter out entries that were not found.
1525 logging.warning('Removed %d lost files', len(previous))
1526 for filename in previous:
1527 self._lru.pop(filename)
1528 self._trim()
1529
1530 def _save(self):
1531 """Saves the LRU ordering."""
1532 self._lock.assert_locked()
1533 if sys.platform != 'win32':
1534 d = os.path.dirname(self.state_file)
1535 if os.path.isdir(d):
1536 # Necessary otherwise the file can't be created.
1537 file_path.set_read_only(d, False)
1538 if os.path.isfile(self.state_file):
1539 file_path.set_read_only(self.state_file, False)
1540 self._lru.save(self.state_file)
1541
1542 def _trim(self):
1543 """Trims anything we don't know, make sure enough free space exists."""
1544 self._lock.assert_locked()
1545
1546 # Ensure maximum cache size.
1547 if self.policies.max_cache_size:
1548 total_size = sum(self._lru.itervalues())
1549 while total_size > self.policies.max_cache_size:
1550 total_size -= self._remove_lru_file()
1551
1552 # Ensure maximum number of items in the cache.
1553 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1554 for _ in xrange(len(self._lru) - self.policies.max_items):
1555 self._remove_lru_file()
1556
1557 # Ensure enough free space.
1558 self._free_disk = file_path.get_free_space(self.cache_dir)
1559 trimmed_due_to_space = False
1560 while (
1561 self.policies.min_free_space and
1562 self._lru and
1563 self._free_disk < self.policies.min_free_space):
1564 trimmed_due_to_space = True
1565 self._remove_lru_file()
1566 self._free_disk = file_path.get_free_space(self.cache_dir)
1567 if trimmed_due_to_space:
1568 total_usage = sum(self._lru.itervalues())
1569 usage_percent = 0.
1570 if total_usage:
1571 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1572 logging.warning(
1573 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1574 'cache (%.1f%% of its maximum capacity)',
1575 self._free_disk / 1024.,
1576 total_usage / 1024.,
1577 usage_percent)
1578 self._save()
1579
1580 def _path(self, digest):
1581 """Returns the path to one item."""
1582 return os.path.join(self.cache_dir, digest)
1583
1584 def _remove_lru_file(self):
1585 """Removes the last recently used file and returns its size."""
1586 self._lock.assert_locked()
1587 digest, size = self._lru.pop_oldest()
1588 self._delete_file(digest, size)
1589 return size
1590
1591 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1592 """Adds an item into LRU cache marking it as a newest one."""
1593 self._lock.assert_locked()
1594 if size == UNKNOWN_FILE_SIZE:
1595 size = os.stat(self._path(digest)).st_size
1596 self._added.append(size)
1597 self._lru.add(digest, size)
1598
1599 def _add_oldest_list(self, digests):
1600 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1601 self._lock.assert_locked()
1602 pairs = []
1603 for digest in digests:
1604 size = os.stat(self._path(digest)).st_size
1605 self._added.append(size)
1606 pairs.append((digest, size))
1607 self._lru.batch_insert_oldest(pairs)
1608
1609 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1610 """Deletes cache file from the file system."""
1611 self._lock.assert_locked()
1612 try:
1613 if size == UNKNOWN_FILE_SIZE:
1614 size = os.stat(self._path(digest)).st_size
1615 file_path.try_remove(self._path(digest))
1616 self._removed.append(size)
1617 except OSError as e:
1618 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1619
1620
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001621class IsolatedBundle(object):
1622 """Fetched and parsed .isolated file with all dependencies."""
1623
Vadim Shtayura3148e072014-09-02 18:51:52 -07001624 def __init__(self):
1625 self.command = []
1626 self.files = {}
1627 self.read_only = None
1628 self.relative_cwd = None
1629 # The main .isolated file, a IsolatedFile instance.
1630 self.root = None
1631
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001632 def fetch(self, fetch_queue, root_isolated_hash, algo):
1633 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001634
1635 It enables support for "included" .isolated files. They are processed in
1636 strict order but fetched asynchronously from the cache. This is important so
1637 that a file in an included .isolated file that is overridden by an embedding
1638 .isolated file is not fetched needlessly. The includes are fetched in one
1639 pass and the files are fetched as soon as all the ones on the left-side
1640 of the tree were fetched.
1641
1642 The prioritization is very important here for nested .isolated files.
1643 'includes' have the highest priority and the algorithm is optimized for both
1644 deep and wide trees. A deep one is a long link of .isolated files referenced
1645 one at a time by one item in 'includes'. A wide one has a large number of
1646 'includes' in a single .isolated file. 'left' is defined as an included
1647 .isolated file earlier in the 'includes' list. So the order of the elements
1648 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001649
1650 As a side effect this method starts asynchronous fetch of all data files
1651 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1652 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001653 """
1654 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1655
1656 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1657 pending = {}
1658 # Set of hashes of already retrieved items to refuse recursive includes.
1659 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001660 # Set of IsolatedFile's whose data files have already being fetched.
1661 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001662
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001663 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001664 h = isolated_file.obj_hash
1665 if h in seen:
1666 raise isolated_format.IsolatedError(
1667 'IsolatedFile %s is retrieved recursively' % h)
1668 assert h not in pending
1669 seen.add(h)
1670 pending[h] = isolated_file
1671 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1672
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001673 # Start fetching root *.isolated file (single file, not the whole bundle).
1674 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001675
1676 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001677 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001678 item_hash = fetch_queue.wait(pending)
1679 item = pending.pop(item_hash)
1680 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001681
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001682 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001683 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001684 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001685
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001686 # Always fetch *.isolated files in traversal order, waiting if necessary
1687 # until next to-be-processed node loads. "Waiting" is done by yielding
1688 # back to the outer loop, that waits until some *.isolated is loaded.
1689 for node in isolated_format.walk_includes(self.root):
1690 if node not in processed:
1691 # Not visited, and not yet loaded -> wait for it to load.
1692 if not node.is_loaded:
1693 break
1694 # Not visited and loaded -> process it and continue the traversal.
1695 self._start_fetching_files(node, fetch_queue)
1696 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001697
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001698 # All *.isolated files should be processed by now and only them.
1699 all_isolateds = set(isolated_format.walk_includes(self.root))
1700 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001701
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001702 # Extract 'command' and other bundle properties.
1703 for node in isolated_format.walk_includes(self.root):
1704 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001705 self.relative_cwd = self.relative_cwd or ''
1706
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001707 def _start_fetching_files(self, isolated, fetch_queue):
1708 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001709
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001710 Modifies self.files.
1711 """
1712 logging.debug('fetch_files(%s)', isolated.obj_hash)
1713 for filepath, properties in isolated.data.get('files', {}).iteritems():
1714 # Root isolated has priority on the files being mapped. In particular,
1715 # overridden files must not be fetched.
1716 if filepath not in self.files:
1717 self.files[filepath] = properties
1718 if 'h' in properties:
1719 # Preemptively request files.
1720 logging.debug('fetching %s', filepath)
1721 fetch_queue.add(
1722 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1723
1724 def _update_self(self, node):
1725 """Extracts bundle global parameters from loaded *.isolated file.
1726
1727 Will be called with each loaded *.isolated file in order of traversal of
1728 isolated include graph (see isolated_format.walk_includes).
1729 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001730 # Grabs properties.
1731 if not self.command and node.data.get('command'):
1732 # Ensure paths are correctly separated on windows.
1733 self.command = node.data['command']
1734 if self.command:
1735 self.command[0] = self.command[0].replace('/', os.path.sep)
1736 self.command = tools.fix_python_path(self.command)
1737 if self.read_only is None and node.data.get('read_only') is not None:
1738 self.read_only = node.data['read_only']
1739 if (self.relative_cwd is None and
1740 node.data.get('relative_cwd') is not None):
1741 self.relative_cwd = node.data['relative_cwd']
1742
1743
Vadim Shtayura8623c272014-12-01 11:45:27 -08001744def set_storage_api_class(cls):
1745 """Replaces StorageApi implementation used by default."""
1746 global _storage_api_cls
1747 assert _storage_api_cls is None
1748 assert issubclass(cls, StorageApi)
1749 _storage_api_cls = cls
1750
1751
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001752def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001753 """Returns an object that implements low-level StorageApi interface.
1754
1755 It is used by Storage to work with single isolate |namespace|. It should
1756 rarely be used directly by clients, see 'get_storage' for
1757 a better alternative.
1758
1759 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001760 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001761 namespace: isolate namespace to operate in, also defines hashing and
1762 compression scheme used, i.e. namespace names that end with '-gzip'
1763 store compressed data.
1764
1765 Returns:
1766 Instance of StorageApi subclass.
1767 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001768 cls = _storage_api_cls or IsolateServer
1769 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001770
1771
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001772def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001773 """Returns Storage class that can upload and download from |namespace|.
1774
1775 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001776 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001777 namespace: isolate namespace to operate in, also defines hashing and
1778 compression scheme used, i.e. namespace names that end with '-gzip'
1779 store compressed data.
1780
1781 Returns:
1782 Instance of Storage.
1783 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001784 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001785
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001786
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001787def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001788 """Uploads the given tree to the given url.
1789
1790 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001791 base_url: The url of the isolate server to upload to.
1792 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001793 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001794 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001795 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001796 # Filter out symlinks, since they are not represented by items on isolate
1797 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001798 items = []
1799 seen = set()
1800 skipped = 0
1801 for filepath, metadata in infiles:
1802 if 'l' not in metadata and filepath not in seen:
1803 seen.add(filepath)
1804 item = FileItem(
1805 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001806 digest=metadata['h'],
1807 size=metadata['s'],
1808 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001809 items.append(item)
1810 else:
1811 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001812
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001813 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001814 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001815 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001816
1817
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001818def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001819 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001820
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001821 Arguments:
1822 isolated_hash: hash of the root *.isolated file.
1823 storage: Storage class that communicates with isolate storage.
1824 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001825 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001826 require_command: Ensure *.isolated specifies a command to run.
1827
1828 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001829 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001830 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001831 logging.debug(
1832 'fetch_isolated(%s, %s, %s, %s, %s)',
1833 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001834 # Hash algorithm to use, defined by namespace |storage| is using.
1835 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001836 with cache:
1837 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001838 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001839
1840 with tools.Profiler('GetIsolateds'):
1841 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001842 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001843 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1844 try:
1845 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1846 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001847 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001848 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1849 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001850
1851 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001852 bundle.fetch(fetch_queue, isolated_hash, algo)
1853 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001854 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1855 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001856 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001857
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001858 with tools.Profiler('GetRest'):
1859 # Create file system hierarchy.
1860 if not os.path.isdir(outdir):
1861 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001862 create_directories(outdir, bundle.files)
1863 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001864
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001865 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001866 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001867 if not os.path.isdir(cwd):
1868 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001869
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001870 # Multimap: digest -> list of pairs (path, props).
1871 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001872 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001873 if 'h' in props:
1874 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001875
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001876 # Now block on the remaining files to be downloaded and mapped.
1877 logging.info('Retrieving remaining files (%d of them)...',
1878 fetch_queue.pending_count)
1879 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001880 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001881 while remaining:
1882 detector.ping()
1883
1884 # Wait for any item to finish fetching to cache.
1885 digest = fetch_queue.wait(remaining)
1886
1887 # Link corresponding files to a fetched item in cache.
1888 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001889 cache.hardlink(
1890 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001891
1892 # Report progress.
1893 duration = time.time() - last_update
1894 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1895 msg = '%d files remaining...' % len(remaining)
1896 print msg
1897 logging.info(msg)
1898 last_update = time.time()
1899
1900 # Cache could evict some items we just tried to fetch, it's a fatal error.
1901 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001902 raise isolated_format.MappingError(
1903 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001904 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001905
1906
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001907def directory_to_metadata(root, algo, blacklist):
1908 """Returns the FileItem list and .isolated metadata for a directory."""
1909 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001910 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001911 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001912 metadata = {
1913 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001914 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001915 for relpath in paths
1916 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001917 for v in metadata.itervalues():
1918 v.pop('t')
1919 items = [
1920 FileItem(
1921 path=os.path.join(root, relpath),
1922 digest=meta['h'],
1923 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001924 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001925 for relpath, meta in metadata.iteritems() if 'h' in meta
1926 ]
1927 return items, metadata
1928
1929
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001930def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001931 """Stores every entries and returns the relevant data.
1932
1933 Arguments:
1934 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001935 files: list of file paths to upload. If a directory is specified, a
1936 .isolated file is created and its hash is returned.
1937 blacklist: function that returns True if a file should be omitted.
1938 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001939 assert all(isinstance(i, unicode) for i in files), files
1940 if len(files) != len(set(map(os.path.abspath, files))):
1941 raise Error('Duplicate entries found.')
1942
1943 results = []
1944 # The temporary directory is only created as needed.
1945 tempdir = None
1946 try:
1947 # TODO(maruel): Yield the files to a worker thread.
1948 items_to_upload = []
1949 for f in files:
1950 try:
1951 filepath = os.path.abspath(f)
1952 if os.path.isdir(filepath):
1953 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001954 items, metadata = directory_to_metadata(
1955 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001956
1957 # Create the .isolated file.
1958 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001959 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1960 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001961 os.close(handle)
1962 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001963 'algo':
1964 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001965 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001966 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001967 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001968 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001969 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001970 items_to_upload.extend(items)
1971 items_to_upload.append(
1972 FileItem(
1973 path=isolated,
1974 digest=h,
1975 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001976 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001977 results.append((h, f))
1978
1979 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001980 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001981 items_to_upload.append(
1982 FileItem(
1983 path=filepath,
1984 digest=h,
1985 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001986 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001987 results.append((h, f))
1988 else:
1989 raise Error('%s is neither a file or directory.' % f)
1990 except OSError:
1991 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001992 # Technically we would care about which files were uploaded but we don't
1993 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001994 _uploaded_files = storage.upload_items(items_to_upload)
1995 return results
1996 finally:
Marc-Antoine Ruel1b7bfec2015-02-11 15:35:42 -05001997 if tempdir and os.path.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001998 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001999
2000
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002001def archive(out, namespace, files, blacklist):
2002 if files == ['-']:
2003 files = sys.stdin.readlines()
2004
2005 if not files:
2006 raise Error('Nothing to upload')
2007
2008 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002009 blacklist = tools.gen_blacklist(blacklist)
2010 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002011 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002012 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2013
2014
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002015@subcommand.usage('<file1..fileN> or - to read from stdin')
2016def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002017 """Archives data to the server.
2018
2019 If a directory is specified, a .isolated file is created the whole directory
2020 is uploaded. Then this .isolated file can be included in another one to run
2021 commands.
2022
2023 The commands output each file that was processed with its content hash. For
2024 directories, the .isolated generated for the directory is listed as the
2025 directory entry itself.
2026 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002027 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002028 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002029 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002030 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002031 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002032 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002033 except Error as e:
2034 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002035 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002036
2037
2038def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002039 """Download data from the server.
2040
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002041 It can either download individual files or a complete tree from a .isolated
2042 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002043 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002044 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002045 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002046 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002047 help='hash of an isolated file, .isolated file content is discarded, use '
2048 '--file if you need it')
2049 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002050 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2051 help='hash and destination of a file, can be used multiple times')
2052 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002053 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002054 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002055 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002056 options, args = parser.parse_args(args)
2057 if args:
2058 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002059
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002060 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002061 if bool(options.isolated) == bool(options.file):
2062 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002063
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002064 cache = process_cache_options(options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002065 options.target = os.path.abspath(options.target)
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002066 if options.isolated:
2067 if (os.path.isfile(options.target) or
2068 (os.path.isdir(options.target) and os.listdir(options.target))):
2069 parser.error(
2070 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002071 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002072 # Fetching individual files.
2073 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002074 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002075 channel = threading_utils.TaskChannel()
2076 pending = {}
2077 for digest, dest in options.file:
2078 pending[digest] = dest
2079 storage.async_fetch(
2080 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002081 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002082 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002083 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002084 functools.partial(file_write, os.path.join(options.target, dest)))
2085 while pending:
2086 fetched = channel.pull()
2087 dest = pending.pop(fetched)
2088 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002089
Vadim Shtayura3172be52013-12-03 12:49:05 -08002090 # Fetching whole isolated tree.
2091 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002092 with cache:
2093 bundle = fetch_isolated(
2094 isolated_hash=options.isolated,
2095 storage=storage,
2096 cache=cache,
2097 outdir=options.target,
2098 require_command=False)
2099 if bundle.command:
2100 rel = os.path.join(options.target, bundle.relative_cwd)
2101 print('To run this test please run from the directory %s:' %
2102 os.path.join(options.target, rel))
2103 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002104
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002105 return 0
2106
2107
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002108def add_archive_options(parser):
2109 parser.add_option(
2110 '--blacklist',
2111 action='append', default=list(DEFAULT_BLACKLIST),
2112 help='List of regexp to use as blacklist filter when uploading '
2113 'directories')
2114
2115
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002116def add_isolate_server_options(parser):
2117 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002118 parser.add_option(
2119 '-I', '--isolate-server',
2120 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002121 help='URL of the Isolate Server to use. Defaults to the environment '
2122 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2123 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002124 parser.add_option(
2125 '--namespace', default='default-gzip',
2126 help='The namespace to use on the Isolate Server, default: %default')
2127
2128
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002129def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002130 """Processes the --isolate-server option and aborts if not specified.
2131
2132 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002133 """
2134 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002135 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002136 try:
2137 options.isolate_server = net.fix_url(options.isolate_server)
2138 except ValueError as e:
2139 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002140 if set_exception_handler:
2141 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002142 try:
2143 return auth.ensure_logged_in(options.isolate_server)
2144 except ValueError as e:
2145 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002146
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002147
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002148def add_cache_options(parser):
2149 cache_group = optparse.OptionGroup(parser, 'Cache management')
2150 cache_group.add_option(
2151 '--cache', metavar='DIR',
2152 help='Directory to keep a local cache of the files. Accelerates download '
2153 'by reusing already downloaded files. Default=%default')
2154 cache_group.add_option(
2155 '--max-cache-size',
2156 type='int',
2157 metavar='NNN',
2158 default=20*1024*1024*1024,
2159 help='Trim if the cache gets larger than this value, default=%default')
2160 cache_group.add_option(
2161 '--min-free-space',
2162 type='int',
2163 metavar='NNN',
2164 default=2*1024*1024*1024,
2165 help='Trim if disk free space becomes lower than this value, '
2166 'default=%default')
2167 cache_group.add_option(
2168 '--max-items',
2169 type='int',
2170 metavar='NNN',
2171 default=100000,
2172 help='Trim if more than this number of items are in the cache '
2173 'default=%default')
2174 parser.add_option_group(cache_group)
2175
2176
2177def process_cache_options(options):
2178 if options.cache:
2179 policies = CachePolicies(
2180 options.max_cache_size, options.min_free_space, options.max_items)
2181
2182 # |options.cache| path may not exist until DiskCache() instance is created.
2183 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002184 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002185 policies,
2186 isolated_format.get_hash_algo(options.namespace))
2187 else:
2188 return MemoryCache()
2189
2190
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002191class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002192 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002193 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002194 self,
2195 version=__version__,
2196 prog=os.path.basename(sys.modules[__name__].__file__),
2197 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002198 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002199
2200 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002201 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002202 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002203 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002204 return options, args
2205
2206
2207def main(args):
2208 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002209 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002210
2211
2212if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002213 fix_encoding.fix_encoding()
2214 tools.disable_buffering()
2215 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002216 sys.exit(main(sys.argv[1:]))