blob: 3984d001e7c7a4a6e20af02ebb80422c987fc31a [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05006"""Archives a set of files or directories to a server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04008__version__ = '0.3.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.org41601642013-09-18 19:40:46 +000011import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000014import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050015import shutil
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050017import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000018import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000020import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050021import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000022import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000024from third_party import colorama
25from third_party.depot_tools import fix_encoding
26from third_party.depot_tools import subcommand
27
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050028from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000029from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040030from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000031from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000032from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000033
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080034import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040035import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080036
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -040037# TODO(maruel): Temporary to make the next code migration simpler.
38from isolated_format import IsolatedError, MappingError, UNKNOWN_FILE_SIZE
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000040# Version of isolate protocol passed to the server in /handshake request.
41ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043
44# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000045# All files are sorted by likelihood of a change in the file content
46# (currently file size is used to estimate this: larger the file -> larger the
47# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000049# and so on. Numbers here is a trade-off; the more per request, the lower the
50# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
51# larger values cause longer lookups, increasing the initial latency to start
52# uploading, which is especially an issue for large files. This value is
53# optimized for the "few thousands files to look up with minimal number of large
54# files missing" case.
55ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000056
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000057
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000058# A list of already compressed extension types that should not receive any
59# compression before being uploaded.
60ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040061 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
62 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000063]
64
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000065
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000066# Chunk size to use when reading from network stream.
67NET_IO_FILE_CHUNK = 16 * 1024
68
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000069
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000070# Read timeout in seconds for downloads from isolate storage. If there's no
71# response from the server within this timeout whole download will be aborted.
72DOWNLOAD_READ_TIMEOUT = 60
73
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000074# Maximum expected delay (in seconds) between successive file fetches
75# in run_tha_test. If it takes longer than that, a deadlock might be happening
76# and all stack frames for all threads are dumped to log.
77DEADLOCK_TIMEOUT = 5 * 60
78
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000079
maruel@chromium.org41601642013-09-18 19:40:46 +000080# The delay (in seconds) to wait between logging statements when retrieving
81# the required files. This is intended to let the user (or buildbot) know that
82# the program is still running.
83DELAY_BETWEEN_UPDATES_IN_SECS = 30
84
85
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050086DEFAULT_BLACKLIST = (
87 # Temporary vim or python files.
88 r'^.+\.(?:pyc|swp)$',
89 # .git or .svn directory.
90 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
91)
92
93
94# Chromium-specific.
95DEFAULT_BLACKLIST += (
96 r'^.+\.(?:run_test_cases)$',
97 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
98)
99
100
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500101class Error(Exception):
102 """Generic runtime error."""
103 pass
104
105
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000106def stream_read(stream, chunk_size):
107 """Reads chunks from |stream| and yields them."""
108 while True:
109 data = stream.read(chunk_size)
110 if not data:
111 break
112 yield data
113
114
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400115def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800116 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800118 if offset:
119 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000120 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000121 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000122 if not data:
123 break
124 yield data
125
126
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000127def file_write(filepath, content_generator):
128 """Writes file content as generated by content_generator.
129
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000130 Creates the intermediary directory as needed.
131
132 Returns the number of bytes written.
133
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000134 Meant to be mocked out in unit tests.
135 """
136 filedir = os.path.dirname(filepath)
137 if not os.path.isdir(filedir):
138 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000139 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000140 with open(filepath, 'wb') as f:
141 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000142 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000144 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000145
146
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000147def zip_compress(content_generator, level=7):
148 """Reads chunks from |content_generator| and yields zip compressed chunks."""
149 compressor = zlib.compressobj(level)
150 for chunk in content_generator:
151 compressed = compressor.compress(chunk)
152 if compressed:
153 yield compressed
154 tail = compressor.flush(zlib.Z_FINISH)
155 if tail:
156 yield tail
157
158
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400159def zip_decompress(
160 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000161 """Reads zipped data from |content_generator| and yields decompressed data.
162
163 Decompresses data in small chunks (no larger than |chunk_size|) so that
164 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
165
166 Raises IOError if data is corrupted or incomplete.
167 """
168 decompressor = zlib.decompressobj()
169 compressed_size = 0
170 try:
171 for chunk in content_generator:
172 compressed_size += len(chunk)
173 data = decompressor.decompress(chunk, chunk_size)
174 if data:
175 yield data
176 while decompressor.unconsumed_tail:
177 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
178 if data:
179 yield data
180 tail = decompressor.flush()
181 if tail:
182 yield tail
183 except zlib.error as e:
184 raise IOError(
185 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
186 # Ensure all data was read and decompressed.
187 if decompressor.unused_data or decompressor.unconsumed_tail:
188 raise IOError('Not all data was decompressed')
189
190
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000191def get_zip_compression_level(filename):
192 """Given a filename calculates the ideal zip compression level to use."""
193 file_ext = os.path.splitext(filename)[1].lower()
194 # TODO(csharp): Profile to find what compression level works best.
195 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
196
197
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000198def create_directories(base_directory, files):
199 """Creates the directory structure needed by the given list of files."""
200 logging.debug('create_directories(%s, %d)', base_directory, len(files))
201 # Creates the tree of directories to create.
202 directories = set(os.path.dirname(f) for f in files)
203 for item in list(directories):
204 while item:
205 directories.add(item)
206 item = os.path.dirname(item)
207 for d in sorted(directories):
208 if d:
209 os.mkdir(os.path.join(base_directory, d))
210
211
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500212def create_symlinks(base_directory, files):
213 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000214 for filepath, properties in files:
215 if 'l' not in properties:
216 continue
217 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500218 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000219 logging.warning('Ignoring symlink %s', filepath)
220 continue
221 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500222 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000223 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000224
225
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000226def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000227 """Determines if the given files appears valid.
228
229 Currently it just checks the file's size.
230 """
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400231 if size == isolated_format.UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000232 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000233 actual_size = os.stat(filepath).st_size
234 if size != actual_size:
235 logging.warning(
236 'Found invalid item %s; %d != %d',
237 os.path.basename(filepath), actual_size, size)
238 return False
239 return True
240
241
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000242class WorkerPool(threading_utils.AutoRetryThreadPool):
243 """Thread pool that automatically retries on IOError and runs a preconfigured
244 function.
245 """
246 # Initial and maximum number of worker threads.
247 INITIAL_WORKERS = 2
248 MAX_WORKERS = 16
249 RETRIES = 5
250
251 def __init__(self):
252 super(WorkerPool, self).__init__(
253 [IOError],
254 self.RETRIES,
255 self.INITIAL_WORKERS,
256 self.MAX_WORKERS,
257 0,
258 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000259
260
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261class Item(object):
262 """An item to push to Storage.
263
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800264 Its digest and size may be provided in advance, if known. Otherwise they will
265 be derived from content(). If digest is provided, it MUST correspond to
266 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800268 When used with Storage, Item starts its life in a main thread, travels
269 to 'contains' thread, then to 'push' thread and then finally back to
270 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000271 """
272
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800273 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000274 self.digest = digest
275 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800276 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000277 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000278
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800279 def content(self):
280 """Iterable with content of this item as byte string (str) chunks."""
281 raise NotImplementedError()
282
283 def prepare(self, hash_algo):
284 """Ensures self.digest and self.size are set.
285
286 Uses content() as a source of data to calculate them. Does nothing if digest
287 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000288
289 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800290 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000291 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800292 if self.digest is None or self.size is None:
293 digest = hash_algo()
294 total = 0
295 for chunk in self.content():
296 digest.update(chunk)
297 total += len(chunk)
298 self.digest = digest.hexdigest()
299 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000300
301
302class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800303 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000304
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800305 Its digest and size may be provided in advance, if known. Otherwise they will
306 be derived from the file content.
307 """
308
309 def __init__(self, path, digest=None, size=None, high_priority=False):
310 super(FileItem, self).__init__(
311 digest,
312 size if size is not None else os.stat(path).st_size,
313 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000314 self.path = path
315 self.compression_level = get_zip_compression_level(path)
316
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 def content(self):
318 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000319
320
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000321class BufferItem(Item):
322 """A byte buffer to push to Storage."""
323
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 def __init__(self, buf, high_priority=False):
325 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000326 self.buffer = buf
327
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800328 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000329 return [self.buffer]
330
331
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000332class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800333 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000334
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800335 Implements compression support, parallel 'contains' checks, parallel uploads
336 and more.
337
338 Works only within single namespace (and thus hashing algorithm and compression
339 scheme are fixed).
340
341 Spawns multiple internal threads. Thread safe, but not fork safe.
342 """
343
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700344 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000345 self._storage_api = storage_api
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700346 self._use_zip = is_namespace_with_compression(storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400347 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000348 self._cpu_thread_pool = None
349 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000350
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000351 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700352 def hash_algo(self):
353 """Hashing algorithm used to name files in storage based on their content.
354
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400355 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700356 """
357 return self._hash_algo
358
359 @property
360 def location(self):
361 """Location of a backing store that this class is using.
362
363 Exact meaning depends on the storage_api type. For IsolateServer it is
364 an URL of isolate server, for FileSystem is it a path in file system.
365 """
366 return self._storage_api.location
367
368 @property
369 def namespace(self):
370 """Isolate namespace used by this storage.
371
372 Indirectly defines hashing scheme and compression method used.
373 """
374 return self._storage_api.namespace
375
376 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000377 def cpu_thread_pool(self):
378 """ThreadPool for CPU-bound tasks like zipping."""
379 if self._cpu_thread_pool is None:
380 self._cpu_thread_pool = threading_utils.ThreadPool(
381 2, max(threading_utils.num_processors(), 2), 0, 'zip')
382 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000383
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000384 @property
385 def net_thread_pool(self):
386 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
387 if self._net_thread_pool is None:
388 self._net_thread_pool = WorkerPool()
389 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000390
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000391 def close(self):
392 """Waits for all pending tasks to finish."""
393 if self._cpu_thread_pool:
394 self._cpu_thread_pool.join()
395 self._cpu_thread_pool.close()
396 self._cpu_thread_pool = None
397 if self._net_thread_pool:
398 self._net_thread_pool.join()
399 self._net_thread_pool.close()
400 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000401
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000402 def __enter__(self):
403 """Context manager interface."""
404 return self
405
406 def __exit__(self, _exc_type, _exc_value, _traceback):
407 """Context manager interface."""
408 self.close()
409 return False
410
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000411 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800412 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000413
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800414 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000415
416 Arguments:
417 items: list of Item instances that represents data to upload.
418
419 Returns:
420 List of items that were uploaded. All other items are already there.
421 """
422 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
423 # used by swarming.py. There's no need to spawn multiple threads and try to
424 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
425 # 'push' should be performed sequentially in the context of current thread.
426
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800427 # Ensure all digests are calculated.
428 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700429 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800430
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000431 # For each digest keep only first Item that matches it. All other items
432 # are just indistinguishable copies from the point of view of isolate
433 # server (it doesn't care about paths at all, only content and digests).
434 seen = {}
435 duplicates = 0
436 for item in items:
437 if seen.setdefault(item.digest, item) is not item:
438 duplicates += 1
439 items = seen.values()
440 if duplicates:
441 logging.info('Skipped %d duplicated files', duplicates)
442
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000443 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000444 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000445 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800446 channel = threading_utils.TaskChannel()
447 for missing_item, push_state in self.get_missing_items(items):
448 missing.add(missing_item)
449 self.async_push(channel, missing_item, push_state)
450
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000451 # No need to spawn deadlock detector thread if there's nothing to upload.
452 if missing:
453 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
454 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000455 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000456 detector.ping()
457 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000458 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000459 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000460 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 logging.info('All files are uploaded')
462
463 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000464 total = len(items)
465 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000466 logging.info(
467 'Total: %6d, %9.1fkb',
468 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000469 total_size / 1024.)
470 cache_hit = set(items) - missing
471 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000472 logging.info(
473 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
474 len(cache_hit),
475 cache_hit_size / 1024.,
476 len(cache_hit) * 100. / total,
477 cache_hit_size * 100. / total_size if total_size else 0)
478 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000479 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000480 logging.info(
481 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
482 len(cache_miss),
483 cache_miss_size / 1024.,
484 len(cache_miss) * 100. / total,
485 cache_miss_size * 100. / total_size if total_size else 0)
486
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000487 return uploaded
488
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800489 def get_fetch_url(self, item):
490 """Returns an URL that can be used to fetch given item once it's uploaded.
491
492 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000493
494 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800495 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496
497 Returns:
498 An URL or None if underlying protocol doesn't support this.
499 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700500 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800501 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000502
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800503 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000504 """Starts asynchronous push to the server in a parallel thread.
505
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800506 Can be used only after |item| was checked for presence on a server with
507 'get_missing_items' call. 'get_missing_items' returns |push_state| object
508 that contains storage specific information describing how to upload
509 the item (for example in case of cloud storage, it is signed upload URLs).
510
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000511 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000512 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000513 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800514 push_state: push state returned by 'get_missing_items' call for |item|.
515
516 Returns:
517 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000518 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800519 # Thread pool task priority.
520 priority = WorkerPool.HIGH if item.high_priority else WorkerPool.MED
521
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000522 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400523 """Pushes an Item and returns it to |channel|."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700524 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800525 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 return item
527
528 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700529 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800530 self.net_thread_pool.add_task_with_channel(
531 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532 return
533
534 # If zipping is enabled, zip in a separate thread.
535 def zip_and_push():
536 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
537 # content right here. It will block until all file is zipped.
538 try:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800539 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540 data = ''.join(stream)
541 except Exception as exc:
542 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800543 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000545 self.net_thread_pool.add_task_with_channel(
546 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000548
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800549 def push(self, item, push_state):
550 """Synchronously pushes a single item to the server.
551
552 If you need to push many items at once, consider using 'upload_items' or
553 'async_push' with instance of TaskChannel.
554
555 Arguments:
556 item: item to upload as instance of Item class.
557 push_state: push state returned by 'get_missing_items' call for |item|.
558
559 Returns:
560 Pushed item (same object as |item|).
561 """
562 channel = threading_utils.TaskChannel()
563 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
564 self.async_push(channel, item, push_state)
565 pushed = channel.pull()
566 assert pushed is item
567 return item
568
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000569 def async_fetch(self, channel, priority, digest, size, sink):
570 """Starts asynchronous fetch from the server in a parallel thread.
571
572 Arguments:
573 channel: TaskChannel that receives back |digest| when download ends.
574 priority: thread pool task priority for the fetch.
575 digest: hex digest of an item to download.
576 size: expected size of the item (after decompression).
577 sink: function that will be called as sink(generator).
578 """
579 def fetch():
580 try:
581 # Prepare reading pipeline.
582 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700583 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400584 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000585 # Run |stream| through verifier that will assert its size.
586 verifier = FetchStreamVerifier(stream, size)
587 # Verified stream goes to |sink|.
588 sink(verifier.run())
589 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800590 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000591 raise
592 return digest
593
594 # Don't bother with zip_thread_pool for decompression. Decompression is
595 # really fast and most probably IO bound anyway.
596 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
597
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000598 def get_missing_items(self, items):
599 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000600
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000601 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000602
603 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000604 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000605
606 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800607 For each missing item it yields a pair (item, push_state), where:
608 * item - Item object that is missing (one of |items|).
609 * push_state - opaque object that contains storage specific information
610 describing how to upload the item (for example in case of cloud
611 storage, it is signed upload URLs). It can later be passed to
612 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000614 channel = threading_utils.TaskChannel()
615 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800616
617 # Ensure all digests are calculated.
618 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700619 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800620
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000621 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800622 for batch in batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000623 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
624 self._storage_api.contains, batch)
625 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800626
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000627 # Yield results as they come in.
628 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800629 for missing_item, push_state in channel.pull().iteritems():
630 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000631
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000632
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800633def batch_items_for_check(items):
634 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000635
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800636 Each batch corresponds to a single 'exists?' query to the server via a call
637 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000638
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639 Arguments:
640 items: a list of Item objects.
641
642 Yields:
643 Batches of items to query for existence in a single operation,
644 each batch is a list of Item objects.
645 """
646 batch_count = 0
647 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
648 next_queries = []
649 for item in sorted(items, key=lambda x: x.size, reverse=True):
650 next_queries.append(item)
651 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000652 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800653 next_queries = []
654 batch_count += 1
655 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
656 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
657 if next_queries:
658 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000659
660
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000661class FetchQueue(object):
662 """Fetches items from Storage and places them into LocalCache.
663
664 It manages multiple concurrent fetch operations. Acts as a bridge between
665 Storage and LocalCache so that Storage and LocalCache don't depend on each
666 other at all.
667 """
668
669 def __init__(self, storage, cache):
670 self.storage = storage
671 self.cache = cache
672 self._channel = threading_utils.TaskChannel()
673 self._pending = set()
674 self._accessed = set()
675 self._fetched = cache.cached_set()
676
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800677 def add(self, digest, size=UNKNOWN_FILE_SIZE, priority=WorkerPool.MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000678 """Starts asynchronous fetch of item |digest|."""
679 # Fetching it now?
680 if digest in self._pending:
681 return
682
683 # Mark this file as in use, verify_all_cached will later ensure it is still
684 # in cache.
685 self._accessed.add(digest)
686
687 # Already fetched? Notify cache to update item's LRU position.
688 if digest in self._fetched:
689 # 'touch' returns True if item is in cache and not corrupted.
690 if self.cache.touch(digest, size):
691 return
692 # Item is corrupted, remove it from cache and fetch it again.
693 self._fetched.remove(digest)
694 self.cache.evict(digest)
695
696 # TODO(maruel): It should look at the free disk space, the current cache
697 # size and the size of the new item on every new item:
698 # - Trim the cache as more entries are listed when free disk space is low,
699 # otherwise if the amount of data downloaded during the run > free disk
700 # space, it'll crash.
701 # - Make sure there's enough free disk space to fit all dependencies of
702 # this run! If not, abort early.
703
704 # Start fetching.
705 self._pending.add(digest)
706 self.storage.async_fetch(
707 self._channel, priority, digest, size,
708 functools.partial(self.cache.write, digest))
709
710 def wait(self, digests):
711 """Starts a loop that waits for at least one of |digests| to be retrieved.
712
713 Returns the first digest retrieved.
714 """
715 # Flush any already fetched items.
716 for digest in digests:
717 if digest in self._fetched:
718 return digest
719
720 # Ensure all requested items are being fetched now.
721 assert all(digest in self._pending for digest in digests), (
722 digests, self._pending)
723
724 # Wait for some requested item to finish fetching.
725 while self._pending:
726 digest = self._channel.pull()
727 self._pending.remove(digest)
728 self._fetched.add(digest)
729 if digest in digests:
730 return digest
731
732 # Should never reach this point due to assert above.
733 raise RuntimeError('Impossible state')
734
735 def inject_local_file(self, path, algo):
736 """Adds local file to the cache as if it was fetched from storage."""
737 with open(path, 'rb') as f:
738 data = f.read()
739 digest = algo(data).hexdigest()
740 self.cache.write(digest, [data])
741 self._fetched.add(digest)
742 return digest
743
744 @property
745 def pending_count(self):
746 """Returns number of items to be fetched."""
747 return len(self._pending)
748
749 def verify_all_cached(self):
750 """True if all accessed items are in cache."""
751 return self._accessed.issubset(self.cache.cached_set())
752
753
754class FetchStreamVerifier(object):
755 """Verifies that fetched file is valid before passing it to the LocalCache."""
756
757 def __init__(self, stream, expected_size):
758 self.stream = stream
759 self.expected_size = expected_size
760 self.current_size = 0
761
762 def run(self):
763 """Generator that yields same items as |stream|.
764
765 Verifies |stream| is complete before yielding a last chunk to consumer.
766
767 Also wraps IOError produced by consumer into MappingError exceptions since
768 otherwise Storage will retry fetch on unrelated local cache errors.
769 """
770 # Read one chunk ahead, keep it in |stored|.
771 # That way a complete stream can be verified before pushing last chunk
772 # to consumer.
773 stored = None
774 for chunk in self.stream:
775 assert chunk is not None
776 if stored is not None:
777 self._inspect_chunk(stored, is_last=False)
778 try:
779 yield stored
780 except IOError as exc:
781 raise MappingError('Failed to store an item in cache: %s' % exc)
782 stored = chunk
783 if stored is not None:
784 self._inspect_chunk(stored, is_last=True)
785 try:
786 yield stored
787 except IOError as exc:
788 raise MappingError('Failed to store an item in cache: %s' % exc)
789
790 def _inspect_chunk(self, chunk, is_last):
791 """Called for each fetched chunk before passing it to consumer."""
792 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400793 if (is_last and
794 (self.expected_size != isolated_format.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000795 (self.expected_size != self.current_size)):
796 raise IOError('Incorrect file size: expected %d, got %d' % (
797 self.expected_size, self.current_size))
798
799
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000800class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800801 """Interface for classes that implement low-level storage operations.
802
803 StorageApi is oblivious of compression and hashing scheme used. This details
804 are handled in higher level Storage class.
805
806 Clients should generally not use StorageApi directly. Storage class is
807 preferred since it implements compression and upload optimizations.
808 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000809
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700810 @property
811 def location(self):
812 """Location of a backing store that this class is using.
813
814 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
815 server, for FileSystem is it a path in file system.
816 """
817 raise NotImplementedError()
818
819 @property
820 def namespace(self):
821 """Isolate namespace used by this storage.
822
823 Indirectly defines hashing scheme and compression method used.
824 """
825 raise NotImplementedError()
826
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000827 def get_fetch_url(self, digest):
828 """Returns an URL that can be used to fetch an item with given digest.
829
830 Arguments:
831 digest: hex digest of item to fetch.
832
833 Returns:
834 An URL or None if the protocol doesn't support this.
835 """
836 raise NotImplementedError()
837
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800838 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000839 """Fetches an object and yields its content.
840
841 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000842 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800843 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000844
845 Yields:
846 Chunks of downloaded item (as str objects).
847 """
848 raise NotImplementedError()
849
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800850 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000851 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000852
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800853 |item| MUST go through 'contains' call to get |push_state| before it can
854 be pushed to the storage.
855
856 To be clear, here is one possible usage:
857 all_items = [... all items to push as Item subclasses ...]
858 for missing_item, push_state in storage_api.contains(all_items).items():
859 storage_api.push(missing_item, push_state)
860
861 When pushing to a namespace with compression, data that should be pushed
862 and data provided by the item is not the same. In that case |content| is
863 not None and it yields chunks of compressed data (using item.content() as
864 a source of original uncompressed data). This is implemented by Storage
865 class.
866
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000867 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000868 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800869 push_state: push state object as returned by 'contains' call.
870 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000871
872 Returns:
873 None.
874 """
875 raise NotImplementedError()
876
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000877 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800878 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000879
880 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800881 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000882
883 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800884 A dict missing Item -> opaque push state object to be passed to 'push'.
885 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000886 """
887 raise NotImplementedError()
888
889
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800890class _IsolateServerPushState(object):
891 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500892
893 Note this needs to be a global class to support pickling.
894 """
895
896 def __init__(self, upload_url, finalize_url):
897 self.upload_url = upload_url
898 self.finalize_url = finalize_url
899 self.uploaded = False
900 self.finalized = False
901
902
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000903class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000904 """StorageApi implementation that downloads and uploads to Isolate Server.
905
906 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800907 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000908 """
909
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000910 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000911 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000912 assert base_url.startswith('http'), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700913 self._base_url = base_url.rstrip('/')
914 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000915 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000916 self._server_caps = None
917
918 @staticmethod
919 def _generate_handshake_request():
920 """Returns a dict to be sent as handshake request body."""
921 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
922 return {
923 'client_app_version': __version__,
924 'fetcher': True,
925 'protocol_version': ISOLATE_PROTOCOL_VERSION,
926 'pusher': True,
927 }
928
929 @staticmethod
930 def _validate_handshake_response(caps):
931 """Validates and normalizes handshake response."""
932 logging.info('Protocol version: %s', caps['protocol_version'])
933 logging.info('Server version: %s', caps['server_app_version'])
934 if caps.get('error'):
935 raise MappingError(caps['error'])
936 if not caps['access_token']:
937 raise ValueError('access_token is missing')
938 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000939
940 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000941 def _server_capabilities(self):
942 """Performs handshake with the server if not yet done.
943
944 Returns:
945 Server capabilities dictionary as returned by /handshake endpoint.
946
947 Raises:
948 MappingError if server rejects the handshake.
949 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000950 # TODO(maruel): Make this request much earlier asynchronously while the
951 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800952
953 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
954 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000955 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000956 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000957 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400958 caps = net.url_read_json(
959 url=self._base_url + '/content-gs/handshake',
960 data=self._generate_handshake_request())
961 if caps is None:
962 raise MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000963 if not isinstance(caps, dict):
964 raise ValueError('Expecting JSON dict')
965 self._server_caps = self._validate_handshake_response(caps)
966 except (ValueError, KeyError, TypeError) as exc:
967 # KeyError exception has very confusing str conversion: it's just a
968 # missing key value and nothing else. So print exception class name
969 # as well.
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400970 raise MappingError(
971 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000972 exc.__class__.__name__, exc))
973 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000974
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700975 @property
976 def location(self):
977 return self._base_url
978
979 @property
980 def namespace(self):
981 return self._namespace
982
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000983 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000984 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000985 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700986 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000987
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800988 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000989 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800990 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000991
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000992 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800993 source_url,
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800994 read_timeout=DOWNLOAD_READ_TIMEOUT,
995 headers={'Range': 'bytes=%d-' % offset} if offset else None)
996
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000997 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -0800998 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800999
1000 # If |offset| is used, verify server respects it by checking Content-Range.
1001 if offset:
1002 content_range = connection.get_header('Content-Range')
1003 if not content_range:
1004 raise IOError('Missing Content-Range header')
1005
1006 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1007 # According to a spec, <size> can be '*' meaning "Total size of the file
1008 # is not known in advance".
1009 try:
1010 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1011 if not match:
1012 raise ValueError()
1013 content_offset = int(match.group(1))
1014 last_byte_index = int(match.group(2))
1015 size = None if match.group(3) == '*' else int(match.group(3))
1016 except ValueError:
1017 raise IOError('Invalid Content-Range header: %s' % content_range)
1018
1019 # Ensure returned offset equals requested one.
1020 if offset != content_offset:
1021 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1022 offset, content_offset, content_range))
1023
1024 # Ensure entire tail of the file is returned.
1025 if size is not None and last_byte_index + 1 != size:
1026 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1027
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001028 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001029
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001030 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001031 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001032 assert item.digest is not None
1033 assert item.size is not None
1034 assert isinstance(push_state, _IsolateServerPushState)
1035 assert not push_state.finalized
1036
1037 # Default to item.content().
1038 content = item.content() if content is None else content
1039
1040 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1041 if isinstance(content, basestring):
1042 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1043 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001044
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001045 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1046 # If |content| is indeed a generator, it can not be re-winded back
1047 # to the beginning of the stream. A retry will find it exhausted. A possible
1048 # solution is to wrap |content| generator with some sort of caching
1049 # restartable generator. It should be done alongside streaming support
1050 # implementation.
1051
1052 # This push operation may be a retry after failed finalization call below,
1053 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001054 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001055 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1056 # upload support is implemented.
1057 if isinstance(content, list) and len(content) == 1:
1058 content = content[0]
1059 else:
1060 content = ''.join(content)
1061 # PUT file to |upload_url|.
1062 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001063 url=push_state.upload_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001064 data=content,
1065 content_type='application/octet-stream',
1066 method='PUT')
1067 if response is None:
1068 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001069 item.digest, push_state.upload_url))
1070 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001071 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001072 logging.info(
1073 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001074
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001075 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001076 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001077 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1078 # send it to isolated server. That way isolate server can verify that
1079 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1080 # stored files).
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001081 # TODO(maruel): Fix the server to accept propery data={} so
1082 # url_read_json() can be used.
1083 response = net.url_read(
1084 url=push_state.finalize_url,
1085 data='',
1086 content_type='application/json',
1087 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001088 if response is None:
1089 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001090 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001091
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001092 def contains(self, items):
1093 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001094
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001095 # Ensure all items were initialized with 'prepare' call. Storage does that.
1096 assert all(i.digest is not None and i.size is not None for i in items)
1097
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001098 # Request body is a json encoded list of dicts.
1099 body = [
1100 {
1101 'h': item.digest,
1102 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001103 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001104 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001105 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001106
1107 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001108 self._base_url,
1109 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001110 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001111
1112 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001113 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001114 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001115 response = net.url_read_json(url=query_url, data=body)
1116 if response is None:
1117 raise MappingError('Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001118 if not isinstance(response, list):
1119 raise ValueError('Expecting response with json-encoded list')
1120 if len(response) != len(items):
1121 raise ValueError(
1122 'Incorrect number of items in the list, expected %d, '
1123 'but got %d' % (len(items), len(response)))
1124 except ValueError as err:
1125 raise MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001126 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001127
1128 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001129 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001130 for i, push_urls in enumerate(response):
1131 if push_urls:
1132 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001133 missing_items[items[i]] = _IsolateServerPushState(
1134 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001135 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001136 len(items), len(items) - len(missing_items))
1137 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001138
1139
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001140class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001141 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001142
1143 The common use case is a NFS/CIFS file server that is mounted locally that is
1144 used to fetch the file on a local partition.
1145 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001146
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001147 # Used for push_state instead of None. That way caller is forced to
1148 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1149 _DUMMY_PUSH_STATE = object()
1150
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001151 def __init__(self, base_path, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001152 super(FileSystem, self).__init__()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001153 self._base_path = base_path
1154 self._namespace = namespace
1155
1156 @property
1157 def location(self):
1158 return self._base_path
1159
1160 @property
1161 def namespace(self):
1162 return self._namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001163
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001164 def get_fetch_url(self, digest):
1165 return None
1166
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001167 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001168 assert isinstance(digest, basestring)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001169 return file_read(os.path.join(self._base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001170
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001171 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001172 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001173 assert item.digest is not None
1174 assert item.size is not None
1175 assert push_state is self._DUMMY_PUSH_STATE
1176 content = item.content() if content is None else content
1177 if isinstance(content, basestring):
1178 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1179 content = [content]
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001180 file_write(os.path.join(self._base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001181
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001182 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001183 assert all(i.digest is not None and i.size is not None for i in items)
1184 return dict(
1185 (item, self._DUMMY_PUSH_STATE) for item in items
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001186 if not os.path.exists(os.path.join(self._base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001187 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001188
1189
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001190class LocalCache(object):
1191 """Local cache that stores objects fetched via Storage.
1192
1193 It can be accessed concurrently from multiple threads, so it should protect
1194 its internal state with some lock.
1195 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001196 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001197
1198 def __enter__(self):
1199 """Context manager interface."""
1200 return self
1201
1202 def __exit__(self, _exc_type, _exec_value, _traceback):
1203 """Context manager interface."""
1204 return False
1205
1206 def cached_set(self):
1207 """Returns a set of all cached digests (always a new object)."""
1208 raise NotImplementedError()
1209
1210 def touch(self, digest, size):
1211 """Ensures item is not corrupted and updates its LRU position.
1212
1213 Arguments:
1214 digest: hash digest of item to check.
1215 size: expected size of this item.
1216
1217 Returns:
1218 True if item is in cache and not corrupted.
1219 """
1220 raise NotImplementedError()
1221
1222 def evict(self, digest):
1223 """Removes item from cache if it's there."""
1224 raise NotImplementedError()
1225
1226 def read(self, digest):
1227 """Returns contents of the cached item as a single str."""
1228 raise NotImplementedError()
1229
1230 def write(self, digest, content):
1231 """Reads data from |content| generator and stores it in cache."""
1232 raise NotImplementedError()
1233
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001234 def hardlink(self, digest, dest, file_mode):
1235 """Ensures file at |dest| has same content as cached |digest|.
1236
1237 If file_mode is provided, it is used to set the executable bit if
1238 applicable.
1239 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001240 raise NotImplementedError()
1241
1242
1243class MemoryCache(LocalCache):
1244 """LocalCache implementation that stores everything in memory."""
1245
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001246 def __init__(self, file_mode_mask=0500):
1247 """Args:
1248 file_mode_mask: bit mask to AND file mode with. Default value will make
1249 all mapped files to be read only.
1250 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001251 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001252 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001253 # Let's not assume dict is thread safe.
1254 self._lock = threading.Lock()
1255 self._contents = {}
1256
1257 def cached_set(self):
1258 with self._lock:
1259 return set(self._contents)
1260
1261 def touch(self, digest, size):
1262 with self._lock:
1263 return digest in self._contents
1264
1265 def evict(self, digest):
1266 with self._lock:
1267 self._contents.pop(digest, None)
1268
1269 def read(self, digest):
1270 with self._lock:
1271 return self._contents[digest]
1272
1273 def write(self, digest, content):
1274 # Assemble whole stream before taking the lock.
1275 data = ''.join(content)
1276 with self._lock:
1277 self._contents[digest] = data
1278
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001279 def hardlink(self, digest, dest, file_mode):
1280 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001281 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001282 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001283 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001284
1285
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001286def is_namespace_with_compression(namespace):
1287 """Returns True if given |namespace| stores compressed objects."""
1288 return namespace.endswith(('-gzip', '-deflate'))
1289
1290
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001291def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001292 """Returns an object that implements low-level StorageApi interface.
1293
1294 It is used by Storage to work with single isolate |namespace|. It should
1295 rarely be used directly by clients, see 'get_storage' for
1296 a better alternative.
1297
1298 Arguments:
1299 file_or_url: a file path to use file system based storage, or URL of isolate
1300 service to use shared cloud based storage.
1301 namespace: isolate namespace to operate in, also defines hashing and
1302 compression scheme used, i.e. namespace names that end with '-gzip'
1303 store compressed data.
1304
1305 Returns:
1306 Instance of StorageApi subclass.
1307 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001308 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001309 return IsolateServer(file_or_url, namespace)
1310 else:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001311 return FileSystem(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001312
1313
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001314def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001315 """Returns Storage class that can upload and download from |namespace|.
1316
1317 Arguments:
1318 file_or_url: a file path to use file system based storage, or URL of isolate
1319 service to use shared cloud based storage.
1320 namespace: isolate namespace to operate in, also defines hashing and
1321 compression scheme used, i.e. namespace names that end with '-gzip'
1322 store compressed data.
1323
1324 Returns:
1325 Instance of Storage.
1326 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001327 return Storage(get_storage_api(file_or_url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001328
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001329
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001330def save_isolated(isolated, data):
1331 """Writes one or multiple .isolated files.
1332
1333 Note: this reference implementation does not create child .isolated file so it
1334 always returns an empty list.
1335
1336 Returns the list of child isolated files that are included by |isolated|.
1337 """
1338 # Make sure the data is valid .isolated data by 'reloading' it.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001339 algo = isolated_format.SUPPORTED_ALGOS[data['algo']]
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001340 load_isolated(json.dumps(data), algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001341 tools.write_json(isolated, data, True)
1342 return []
1343
1344
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001345def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001346 """Uploads the given tree to the given url.
1347
1348 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001349 base_url: The base url, it is assume that |base_url|/has/ can be used to
1350 query if an element was already uploaded, and |base_url|/store/
1351 can be used to upload a new element.
1352 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001353 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001354 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001355 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001356 logging.info('upload_tree(indir=%s, files=%d)', indir, len(infiles))
1357
1358 # Convert |indir| + |infiles| into a list of FileItem objects.
1359 # Filter out symlinks, since they are not represented by items on isolate
1360 # server side.
1361 items = [
1362 FileItem(
1363 path=os.path.join(indir, filepath),
1364 digest=metadata['h'],
1365 size=metadata['s'],
1366 high_priority=metadata.get('priority') == '0')
1367 for filepath, metadata in infiles.iteritems()
1368 if 'l' not in metadata
1369 ]
1370
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001371 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001372 storage.upload_items(items)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001373 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001374
1375
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001376def load_isolated(content, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001377 """Verifies the .isolated file is valid and loads this object with the json
1378 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001379
1380 Arguments:
1381 - content: raw serialized content to load.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001382 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1383 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001384 """
1385 try:
1386 data = json.loads(content)
1387 except ValueError:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001388 raise IsolatedError('Failed to parse: %s...' % content[:100])
maruel@chromium.org41601642013-09-18 19:40:46 +00001389
1390 if not isinstance(data, dict):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001391 raise IsolatedError('Expected dict, got %r' % data)
maruel@chromium.org41601642013-09-18 19:40:46 +00001392
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001393 # Check 'version' first, since it could modify the parsing after.
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001394 value = data.get('version', '1.0')
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001395 if not isinstance(value, basestring):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001396 raise IsolatedError('Expected string, got %r' % value)
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001397 try:
1398 version = tuple(map(int, value.split('.')))
1399 except ValueError:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001400 raise IsolatedError('Expected valid version, got %r' % value)
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001401
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001402 expected_version = tuple(
1403 map(int, isolated_format.ISOLATED_FILE_VERSION.split('.')))
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001404 # Major version must match.
1405 if version[0] != expected_version[0]:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001406 raise IsolatedError(
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001407 'Expected compatible \'%s\' version, got %r' %
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001408 (isolated_format.ISOLATED_FILE_VERSION, value))
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001409
1410 if algo is None:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001411 # TODO(maruel): Remove the default around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001412 # Default the algorithm used in the .isolated file itself, falls back to
1413 # 'sha-1' if unspecified.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001414 algo = isolated_format.SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001415
maruel@chromium.org41601642013-09-18 19:40:46 +00001416 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001417 if key == 'algo':
1418 if not isinstance(value, basestring):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001419 raise IsolatedError('Expected string, got %r' % value)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001420 if value not in isolated_format.SUPPORTED_ALGOS:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001421 raise IsolatedError(
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001422 'Expected one of \'%s\', got %r' %
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001423 (', '.join(sorted(isolated_format.SUPPORTED_ALGOS)), value))
1424 if value != isolated_format.SUPPORTED_ALGOS_REVERSE[algo]:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001425 raise IsolatedError(
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001426 'Expected \'%s\', got %r' %
1427 (isolated_format.SUPPORTED_ALGOS_REVERSE[algo], value))
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001428
1429 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001430 if not isinstance(value, list):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001431 raise IsolatedError('Expected list, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001432 if not value:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001433 raise IsolatedError('Expected non-empty command')
maruel@chromium.org41601642013-09-18 19:40:46 +00001434 for subvalue in value:
1435 if not isinstance(subvalue, basestring):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001436 raise IsolatedError('Expected string, got %r' % subvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001437
1438 elif key == 'files':
1439 if not isinstance(value, dict):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001440 raise IsolatedError('Expected dict, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001441 for subkey, subvalue in value.iteritems():
1442 if not isinstance(subkey, basestring):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001443 raise IsolatedError('Expected string, got %r' % subkey)
maruel@chromium.org41601642013-09-18 19:40:46 +00001444 if not isinstance(subvalue, dict):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001445 raise IsolatedError('Expected dict, got %r' % subvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001446 for subsubkey, subsubvalue in subvalue.iteritems():
1447 if subsubkey == 'l':
1448 if not isinstance(subsubvalue, basestring):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001449 raise IsolatedError('Expected string, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001450 elif subsubkey == 'm':
1451 if not isinstance(subsubvalue, int):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001452 raise IsolatedError('Expected int, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001453 elif subsubkey == 'h':
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001454 if not isolated_format.is_valid_hash(subsubvalue, algo):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001455 raise IsolatedError('Expected sha-1, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001456 elif subsubkey == 's':
Marc-Antoine Ruelaab3a622013-11-28 09:47:05 -05001457 if not isinstance(subsubvalue, (int, long)):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001458 raise IsolatedError('Expected int or long, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001459 else:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001460 raise IsolatedError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001461 if bool('h' in subvalue) == bool('l' in subvalue):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001462 raise IsolatedError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001463 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1464 subvalue)
1465 if bool('h' in subvalue) != bool('s' in subvalue):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001466 raise IsolatedError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001467 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1468 subvalue)
1469 if bool('s' in subvalue) == bool('l' in subvalue):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001470 raise IsolatedError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001471 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1472 subvalue)
1473 if bool('l' in subvalue) and bool('m' in subvalue):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001474 raise IsolatedError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001475 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001476 subvalue)
1477
1478 elif key == 'includes':
1479 if not isinstance(value, list):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001480 raise IsolatedError('Expected list, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001481 if not value:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001482 raise IsolatedError('Expected non-empty includes list')
maruel@chromium.org41601642013-09-18 19:40:46 +00001483 for subvalue in value:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001484 if not isolated_format.is_valid_hash(subvalue, algo):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001485 raise IsolatedError('Expected sha-1, got %r' % subvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001486
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001487 elif key == 'os':
1488 if version >= (1, 4):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001489 raise IsolatedError('Key \'os\' is not allowed starting version 1.4')
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001490
maruel@chromium.org41601642013-09-18 19:40:46 +00001491 elif key == 'read_only':
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001492 if not value in (0, 1, 2):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001493 raise IsolatedError('Expected 0, 1 or 2, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001494
1495 elif key == 'relative_cwd':
1496 if not isinstance(value, basestring):
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001497 raise IsolatedError('Expected string, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001498
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001499 elif key == 'version':
1500 # Already checked above.
1501 pass
1502
maruel@chromium.org41601642013-09-18 19:40:46 +00001503 else:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001504 raise IsolatedError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001505
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001506 # Automatically fix os.path.sep if necessary. While .isolated files are always
1507 # in the the native path format, someone could want to download an .isolated
1508 # tree from another OS.
1509 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1510 if 'files' in data:
1511 data['files'] = dict(
1512 (k.replace(wrong_path_sep, os.path.sep), v)
1513 for k, v in data['files'].iteritems())
1514 for v in data['files'].itervalues():
1515 if 'l' in v:
1516 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1517 if 'relative_cwd' in data:
1518 data['relative_cwd'] = data['relative_cwd'].replace(
1519 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001520 return data
1521
1522
1523class IsolatedFile(object):
1524 """Represents a single parsed .isolated file."""
1525 def __init__(self, obj_hash, algo):
1526 """|obj_hash| is really the sha-1 of the file."""
1527 logging.debug('IsolatedFile(%s)' % obj_hash)
1528 self.obj_hash = obj_hash
1529 self.algo = algo
1530 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1531 # .isolate and all the .isolated files recursively included by it with
1532 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1533 # .isolated file in the hash table, is important, as the later ones are not
1534 # processed until the firsts are retrieved and read.
1535 self.can_fetch = False
1536
1537 # Raw data.
1538 self.data = {}
1539 # A IsolatedFile instance, one per object in self.includes.
1540 self.children = []
1541
1542 # Set once the .isolated file is loaded.
1543 self._is_parsed = False
1544 # Set once the files are fetched.
1545 self.files_fetched = False
1546
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001547 def load(self, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001548 """Verifies the .isolated file is valid and loads this object with the json
1549 data.
1550 """
1551 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1552 assert not self._is_parsed
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001553 self.data = load_isolated(content, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001554 self.children = [
1555 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1556 ]
1557 self._is_parsed = True
1558
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001559 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001560 """Adds files in this .isolated file not present in |files| dictionary.
1561
1562 Preemptively request files.
1563
1564 Note that |files| is modified by this function.
1565 """
1566 assert self.can_fetch
1567 if not self._is_parsed or self.files_fetched:
1568 return
1569 logging.debug('fetch_files(%s)' % self.obj_hash)
1570 for filepath, properties in self.data.get('files', {}).iteritems():
1571 # Root isolated has priority on the files being mapped. In particular,
1572 # overriden files must not be fetched.
1573 if filepath not in files:
1574 files[filepath] = properties
1575 if 'h' in properties:
1576 # Preemptively request files.
1577 logging.debug('fetching %s' % filepath)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001578 fetch_queue.add(properties['h'], properties['s'], WorkerPool.MED)
maruel@chromium.org41601642013-09-18 19:40:46 +00001579 self.files_fetched = True
1580
1581
1582class Settings(object):
1583 """Results of a completely parsed .isolated file."""
1584 def __init__(self):
1585 self.command = []
1586 self.files = {}
1587 self.read_only = None
1588 self.relative_cwd = None
1589 # The main .isolated file, a IsolatedFile instance.
1590 self.root = None
1591
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001592 def load(self, fetch_queue, root_isolated_hash, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001593 """Loads the .isolated and all the included .isolated asynchronously.
1594
1595 It enables support for "included" .isolated files. They are processed in
1596 strict order but fetched asynchronously from the cache. This is important so
1597 that a file in an included .isolated file that is overridden by an embedding
1598 .isolated file is not fetched needlessly. The includes are fetched in one
1599 pass and the files are fetched as soon as all the ones on the left-side
1600 of the tree were fetched.
1601
1602 The prioritization is very important here for nested .isolated files.
1603 'includes' have the highest priority and the algorithm is optimized for both
1604 deep and wide trees. A deep one is a long link of .isolated files referenced
1605 one at a time by one item in 'includes'. A wide one has a large number of
1606 'includes' in a single .isolated file. 'left' is defined as an included
1607 .isolated file earlier in the 'includes' list. So the order of the elements
1608 in 'includes' is important.
1609 """
1610 self.root = IsolatedFile(root_isolated_hash, algo)
1611
1612 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1613 pending = {}
1614 # Set of hashes of already retrieved items to refuse recursive includes.
1615 seen = set()
1616
1617 def retrieve(isolated_file):
1618 h = isolated_file.obj_hash
1619 if h in seen:
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001620 raise IsolatedError('IsolatedFile %s is retrieved recursively' % h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001621 assert h not in pending
1622 seen.add(h)
1623 pending[h] = isolated_file
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001624 fetch_queue.add(h, priority=WorkerPool.HIGH)
maruel@chromium.org41601642013-09-18 19:40:46 +00001625
1626 retrieve(self.root)
1627
1628 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001629 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001630 item = pending.pop(item_hash)
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001631 item.load(fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001632 if item_hash == root_isolated_hash:
1633 # It's the root item.
1634 item.can_fetch = True
1635
1636 for new_child in item.children:
1637 retrieve(new_child)
1638
1639 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001640 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001641
1642 def check(n):
1643 return all(check(x) for x in n.children) and n.files_fetched
1644 assert check(self.root)
1645
1646 self.relative_cwd = self.relative_cwd or ''
maruel@chromium.org41601642013-09-18 19:40:46 +00001647
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001648 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001649 if node.can_fetch:
1650 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001651 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001652 will_break = False
1653 for i in node.children:
1654 if not i.can_fetch:
1655 if will_break:
1656 break
1657 # Automatically mark the first one as fetcheable.
1658 i.can_fetch = True
1659 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001660 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001661
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001662 def _update_self(self, fetch_queue, node):
1663 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001664 # Grabs properties.
1665 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001666 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001667 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001668 if self.command:
1669 self.command[0] = self.command[0].replace('/', os.path.sep)
1670 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001671 if self.read_only is None and node.data.get('read_only') is not None:
1672 self.read_only = node.data['read_only']
1673 if (self.relative_cwd is None and
1674 node.data.get('relative_cwd') is not None):
1675 self.relative_cwd = node.data['relative_cwd']
1676
1677
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001678def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001679 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001680
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001681 Arguments:
1682 isolated_hash: hash of the root *.isolated file.
1683 storage: Storage class that communicates with isolate storage.
1684 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001685 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001686 require_command: Ensure *.isolated specifies a command to run.
1687
1688 Returns:
1689 Settings object that holds details about loaded *.isolated file.
1690 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001691 logging.debug(
1692 'fetch_isolated(%s, %s, %s, %s, %s)',
1693 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001694 # Hash algorithm to use, defined by namespace |storage| is using.
1695 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001696 with cache:
1697 fetch_queue = FetchQueue(storage, cache)
1698 settings = Settings()
1699
1700 with tools.Profiler('GetIsolateds'):
1701 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001702 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001703 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1704 try:
1705 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1706 except IOError:
1707 raise MappingError(
1708 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1709 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001710
1711 # Load all *.isolated and start loading rest of the files.
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001712 settings.load(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001713 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001714 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1715 # easy way to cancel them.
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001716 raise IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001717
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001718 with tools.Profiler('GetRest'):
1719 # Create file system hierarchy.
1720 if not os.path.isdir(outdir):
1721 os.makedirs(outdir)
1722 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001723 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001724
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001725 # Ensure working directory exists.
1726 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1727 if not os.path.isdir(cwd):
1728 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001729
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001730 # Multimap: digest -> list of pairs (path, props).
1731 remaining = {}
1732 for filepath, props in settings.files.iteritems():
1733 if 'h' in props:
1734 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001735
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001736 # Now block on the remaining files to be downloaded and mapped.
1737 logging.info('Retrieving remaining files (%d of them)...',
1738 fetch_queue.pending_count)
1739 last_update = time.time()
1740 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1741 while remaining:
1742 detector.ping()
1743
1744 # Wait for any item to finish fetching to cache.
1745 digest = fetch_queue.wait(remaining)
1746
1747 # Link corresponding files to a fetched item in cache.
1748 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001749 cache.hardlink(
1750 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001751
1752 # Report progress.
1753 duration = time.time() - last_update
1754 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1755 msg = '%d files remaining...' % len(remaining)
1756 print msg
1757 logging.info(msg)
1758 last_update = time.time()
1759
1760 # Cache could evict some items we just tried to fetch, it's a fatal error.
1761 if not fetch_queue.verify_all_cached():
1762 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001763 return settings
1764
1765
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001766def directory_to_metadata(root, algo, blacklist):
1767 """Returns the FileItem list and .isolated metadata for a directory."""
1768 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001769 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001770 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001771 metadata = {
1772 relpath: isolated_format.file_to_metadata(
1773 os.path.join(root, relpath), {}, False, algo)
1774 for relpath in paths
1775 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001776 for v in metadata.itervalues():
1777 v.pop('t')
1778 items = [
1779 FileItem(
1780 path=os.path.join(root, relpath),
1781 digest=meta['h'],
1782 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001783 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001784 for relpath, meta in metadata.iteritems() if 'h' in meta
1785 ]
1786 return items, metadata
1787
1788
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001789def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001790 """Stores every entries and returns the relevant data.
1791
1792 Arguments:
1793 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001794 files: list of file paths to upload. If a directory is specified, a
1795 .isolated file is created and its hash is returned.
1796 blacklist: function that returns True if a file should be omitted.
1797 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001798 assert all(isinstance(i, unicode) for i in files), files
1799 if len(files) != len(set(map(os.path.abspath, files))):
1800 raise Error('Duplicate entries found.')
1801
1802 results = []
1803 # The temporary directory is only created as needed.
1804 tempdir = None
1805 try:
1806 # TODO(maruel): Yield the files to a worker thread.
1807 items_to_upload = []
1808 for f in files:
1809 try:
1810 filepath = os.path.abspath(f)
1811 if os.path.isdir(filepath):
1812 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001813 items, metadata = directory_to_metadata(
1814 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001815
1816 # Create the .isolated file.
1817 if not tempdir:
1818 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1819 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1820 os.close(handle)
1821 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001822 'algo':
1823 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001824 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001825 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001826 }
1827 save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001828 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001829 items_to_upload.extend(items)
1830 items_to_upload.append(
1831 FileItem(
1832 path=isolated,
1833 digest=h,
1834 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001835 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001836 results.append((h, f))
1837
1838 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001839 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001840 items_to_upload.append(
1841 FileItem(
1842 path=filepath,
1843 digest=h,
1844 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001845 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001846 results.append((h, f))
1847 else:
1848 raise Error('%s is neither a file or directory.' % f)
1849 except OSError:
1850 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001851 # Technically we would care about which files were uploaded but we don't
1852 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001853 _uploaded_files = storage.upload_items(items_to_upload)
1854 return results
1855 finally:
1856 if tempdir:
1857 shutil.rmtree(tempdir)
1858
1859
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001860def archive(out, namespace, files, blacklist):
1861 if files == ['-']:
1862 files = sys.stdin.readlines()
1863
1864 if not files:
1865 raise Error('Nothing to upload')
1866
1867 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001868 blacklist = tools.gen_blacklist(blacklist)
1869 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001870 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001871 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1872
1873
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001874@subcommand.usage('<file1..fileN> or - to read from stdin')
1875def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001876 """Archives data to the server.
1877
1878 If a directory is specified, a .isolated file is created the whole directory
1879 is uploaded. Then this .isolated file can be included in another one to run
1880 commands.
1881
1882 The commands output each file that was processed with its content hash. For
1883 directories, the .isolated generated for the directory is listed as the
1884 directory entry itself.
1885 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001886 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001887 parser.add_option(
1888 '--blacklist',
1889 action='append', default=list(DEFAULT_BLACKLIST),
1890 help='List of regexp to use as blacklist filter when uploading '
1891 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001892 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001893 process_isolate_server_options(parser, options)
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001894 if file_path.is_url(options.isolate_server):
1895 auth.ensure_logged_in(options.isolate_server)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001896 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001897 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001898 except Error as e:
1899 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001900 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001901
1902
1903def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001904 """Download data from the server.
1905
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001906 It can either download individual files or a complete tree from a .isolated
1907 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001908 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001909 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001910 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001911 '-i', '--isolated', metavar='HASH',
1912 help='hash of an isolated file, .isolated file content is discarded, use '
1913 '--file if you need it')
1914 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001915 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1916 help='hash and destination of a file, can be used multiple times')
1917 parser.add_option(
1918 '-t', '--target', metavar='DIR', default=os.getcwd(),
1919 help='destination directory')
1920 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001921 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001922 if args:
1923 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001924 if bool(options.isolated) == bool(options.file):
1925 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001926
1927 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001928
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001929 remote = options.isolate_server or options.indir
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001930 if file_path.is_url(remote):
1931 auth.ensure_logged_in(remote)
1932
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001933 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001934 # Fetching individual files.
1935 if options.file:
1936 channel = threading_utils.TaskChannel()
1937 pending = {}
1938 for digest, dest in options.file:
1939 pending[digest] = dest
1940 storage.async_fetch(
1941 channel,
1942 WorkerPool.MED,
1943 digest,
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001944 isolated_format.UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001945 functools.partial(file_write, os.path.join(options.target, dest)))
1946 while pending:
1947 fetched = channel.pull()
1948 dest = pending.pop(fetched)
1949 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001950
Vadim Shtayura3172be52013-12-03 12:49:05 -08001951 # Fetching whole isolated tree.
1952 if options.isolated:
1953 settings = fetch_isolated(
1954 isolated_hash=options.isolated,
1955 storage=storage,
1956 cache=MemoryCache(),
Vadim Shtayura3172be52013-12-03 12:49:05 -08001957 outdir=options.target,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001958 require_command=False)
1959 rel = os.path.join(options.target, settings.relative_cwd)
1960 print('To run this test please run from the directory %s:' %
1961 os.path.join(options.target, rel))
1962 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001963
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001964 return 0
1965
1966
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001967@subcommand.usage('<file1..fileN> or - to read from stdin')
1968def CMDhashtable(parser, args):
1969 """Archives data to a hashtable on the file system.
1970
1971 If a directory is specified, a .isolated file is created the whole directory
1972 is uploaded. Then this .isolated file can be included in another one to run
1973 commands.
1974
1975 The commands output each file that was processed with its content hash. For
1976 directories, the .isolated generated for the directory is listed as the
1977 directory entry itself.
1978 """
1979 add_outdir_options(parser)
1980 parser.add_option(
1981 '--blacklist',
1982 action='append', default=list(DEFAULT_BLACKLIST),
1983 help='List of regexp to use as blacklist filter when uploading '
1984 'directories')
1985 options, files = parser.parse_args(args)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001986 process_outdir_options(parser, options, os.getcwd())
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001987 try:
1988 # Do not compress files when archiving to the file system.
1989 archive(options.outdir, 'default', files, options.blacklist)
1990 except Error as e:
1991 parser.error(e.args[0])
1992 return 0
1993
1994
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001995def add_isolate_server_options(parser, add_indir):
1996 """Adds --isolate-server and --namespace options to parser.
1997
1998 Includes --indir if desired.
1999 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002000 parser.add_option(
2001 '-I', '--isolate-server',
2002 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002003 help='URL of the Isolate Server to use. Defaults to the environment '
2004 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2005 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002006 parser.add_option(
2007 '--namespace', default='default-gzip',
2008 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002009 if add_indir:
2010 parser.add_option(
2011 '--indir', metavar='DIR',
2012 help='Directory used to store the hashtable instead of using an '
2013 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002014
2015
2016def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002017 """Processes the --isolate-server and --indir options and aborts if neither is
2018 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002019 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002020 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002021 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002022 if not has_indir:
2023 parser.error('--isolate-server is required.')
2024 elif not options.indir:
2025 parser.error('Use one of --indir or --isolate-server.')
2026 else:
2027 if has_indir and options.indir:
2028 parser.error('Use only one of --indir or --isolate-server.')
2029
2030 if options.isolate_server:
2031 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002032 if parts.query:
2033 parser.error('--isolate-server doesn\'t support query parameter.')
2034 if parts.fragment:
2035 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002036 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2037 # what is desired here.
2038 new = list(parts)
2039 if not new[1] and new[2]:
2040 new[1] = new[2].rstrip('/')
2041 new[2] = ''
2042 new[2] = new[2].rstrip('/')
2043 options.isolate_server = urlparse.urlunparse(new)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002044 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002045 return
2046
2047 if file_path.is_url(options.indir):
2048 parser.error('Can\'t use an URL for --indir.')
2049 options.indir = unicode(options.indir).replace('/', os.path.sep)
2050 options.indir = os.path.abspath(
2051 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
2052 if not os.path.isdir(options.indir):
2053 parser.error('Path given to --indir must exist.')
2054
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002055
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002056def add_outdir_options(parser):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002057 """Adds --outdir, which is orthogonal to --isolate-server.
2058
2059 Note: On upload, separate commands are used between 'archive' and 'hashtable'.
2060 On 'download', the same command can download from either an isolate server or
2061 a file system.
2062 """
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002063 parser.add_option(
2064 '-o', '--outdir', metavar='DIR',
2065 help='Directory used to recreate the tree.')
2066
2067
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002068def process_outdir_options(parser, options, cwd):
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002069 if not options.outdir:
2070 parser.error('--outdir is required.')
2071 if file_path.is_url(options.outdir):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002072 parser.error('Can\'t use an URL for --outdir.')
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002073 options.outdir = unicode(options.outdir).replace('/', os.path.sep)
2074 # outdir doesn't need native path case since tracing is never done from there.
2075 options.outdir = os.path.abspath(
2076 os.path.normpath(os.path.join(cwd, options.outdir)))
2077 # In theory, we'd create the directory outdir right away. Defer doing it in
2078 # case there's errors in the command line.
2079
2080
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002081class OptionParserIsolateServer(tools.OptionParserWithLogging):
2082 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002083 tools.OptionParserWithLogging.__init__(
2084 self,
2085 version=__version__,
2086 prog=os.path.basename(sys.modules[__name__].__file__),
2087 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002088 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002089
2090 def parse_args(self, *args, **kwargs):
2091 options, args = tools.OptionParserWithLogging.parse_args(
2092 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002093 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002094 return options, args
2095
2096
2097def main(args):
2098 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002099 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002100
2101
2102if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002103 fix_encoding.fix_encoding()
2104 tools.disable_buffering()
2105 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002106 sys.exit(main(sys.argv[1:]))