blob: 9ed00a03781e46798885bd6c97abdded56cc716e [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04008__version__ = '0.3.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040012import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000014import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040015import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050017import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000018import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000020import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050021import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000022import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000024from third_party import colorama
25from third_party.depot_tools import fix_encoding
26from third_party.depot_tools import subcommand
27
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050028from utils import file_path
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040029from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000030from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040031from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000032from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000033from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000034
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040036import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080037
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000039# Version of isolate protocol passed to the server in /handshake request.
40ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000042
Vadim Shtayura3148e072014-09-02 18:51:52 -070043# The file size to be used when we don't know the correct file size,
44# generally used for .isolated files.
45UNKNOWN_FILE_SIZE = None
46
47
48# Maximum expected delay (in seconds) between successive file fetches or uploads
49# in Storage. If it takes longer than that, a deadlock might be happening
50# and all stack frames for all threads are dumped to log.
51DEADLOCK_TIMEOUT = 5 * 60
52
53
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000054# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000055# All files are sorted by likelihood of a change in the file content
56# (currently file size is used to estimate this: larger the file -> larger the
57# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# and so on. Numbers here is a trade-off; the more per request, the lower the
60# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
61# larger values cause longer lookups, increasing the initial latency to start
62# uploading, which is especially an issue for large files. This value is
63# optimized for the "few thousands files to look up with minimal number of large
64# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040065ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000066
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000067
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000068# A list of already compressed extension types that should not receive any
69# compression before being uploaded.
70ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040071 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
72 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000073]
74
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000075
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000076# Chunk size to use when reading from network stream.
77NET_IO_FILE_CHUNK = 16 * 1024
78
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000079
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000080# Read timeout in seconds for downloads from isolate storage. If there's no
81# response from the server within this timeout whole download will be aborted.
82DOWNLOAD_READ_TIMEOUT = 60
83
84
maruel@chromium.org41601642013-09-18 19:40:46 +000085# The delay (in seconds) to wait between logging statements when retrieving
86# the required files. This is intended to let the user (or buildbot) know that
87# the program is still running.
88DELAY_BETWEEN_UPDATES_IN_SECS = 30
89
90
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050091DEFAULT_BLACKLIST = (
92 # Temporary vim or python files.
93 r'^.+\.(?:pyc|swp)$',
94 # .git or .svn directory.
95 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
96)
97
98
Vadim Shtayura8623c272014-12-01 11:45:27 -080099# A class to use to communicate with the server by default. Can be changed by
100# 'set_storage_api_class'. Default is IsolateServer.
101_storage_api_cls = None
102
103
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500104class Error(Exception):
105 """Generic runtime error."""
106 pass
107
108
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400109class Aborted(Error):
110 """Operation aborted."""
111 pass
112
113
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000114def stream_read(stream, chunk_size):
115 """Reads chunks from |stream| and yields them."""
116 while True:
117 data = stream.read(chunk_size)
118 if not data:
119 break
120 yield data
121
122
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400123def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800124 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800126 if offset:
127 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000128 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000129 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000130 if not data:
131 break
132 yield data
133
134
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000135def file_write(filepath, content_generator):
136 """Writes file content as generated by content_generator.
137
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000138 Creates the intermediary directory as needed.
139
140 Returns the number of bytes written.
141
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000142 Meant to be mocked out in unit tests.
143 """
144 filedir = os.path.dirname(filepath)
145 if not os.path.isdir(filedir):
146 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000148 with open(filepath, 'wb') as f:
149 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000150 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000152 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153
154
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000155def zip_compress(content_generator, level=7):
156 """Reads chunks from |content_generator| and yields zip compressed chunks."""
157 compressor = zlib.compressobj(level)
158 for chunk in content_generator:
159 compressed = compressor.compress(chunk)
160 if compressed:
161 yield compressed
162 tail = compressor.flush(zlib.Z_FINISH)
163 if tail:
164 yield tail
165
166
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400167def zip_decompress(
168 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000169 """Reads zipped data from |content_generator| and yields decompressed data.
170
171 Decompresses data in small chunks (no larger than |chunk_size|) so that
172 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
173
174 Raises IOError if data is corrupted or incomplete.
175 """
176 decompressor = zlib.decompressobj()
177 compressed_size = 0
178 try:
179 for chunk in content_generator:
180 compressed_size += len(chunk)
181 data = decompressor.decompress(chunk, chunk_size)
182 if data:
183 yield data
184 while decompressor.unconsumed_tail:
185 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
186 if data:
187 yield data
188 tail = decompressor.flush()
189 if tail:
190 yield tail
191 except zlib.error as e:
192 raise IOError(
193 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
194 # Ensure all data was read and decompressed.
195 if decompressor.unused_data or decompressor.unconsumed_tail:
196 raise IOError('Not all data was decompressed')
197
198
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000199def get_zip_compression_level(filename):
200 """Given a filename calculates the ideal zip compression level to use."""
201 file_ext = os.path.splitext(filename)[1].lower()
202 # TODO(csharp): Profile to find what compression level works best.
203 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
204
205
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000206def create_directories(base_directory, files):
207 """Creates the directory structure needed by the given list of files."""
208 logging.debug('create_directories(%s, %d)', base_directory, len(files))
209 # Creates the tree of directories to create.
210 directories = set(os.path.dirname(f) for f in files)
211 for item in list(directories):
212 while item:
213 directories.add(item)
214 item = os.path.dirname(item)
215 for d in sorted(directories):
216 if d:
217 os.mkdir(os.path.join(base_directory, d))
218
219
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500220def create_symlinks(base_directory, files):
221 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000222 for filepath, properties in files:
223 if 'l' not in properties:
224 continue
225 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500226 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000227 logging.warning('Ignoring symlink %s', filepath)
228 continue
229 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500230 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000231 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000232
233
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000234def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000235 """Determines if the given files appears valid.
236
237 Currently it just checks the file's size.
238 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700239 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000240 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000241 actual_size = os.stat(filepath).st_size
242 if size != actual_size:
243 logging.warning(
244 'Found invalid item %s; %d != %d',
245 os.path.basename(filepath), actual_size, size)
246 return False
247 return True
248
249
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000250class Item(object):
251 """An item to push to Storage.
252
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800253 Its digest and size may be provided in advance, if known. Otherwise they will
254 be derived from content(). If digest is provided, it MUST correspond to
255 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000256
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800257 When used with Storage, Item starts its life in a main thread, travels
258 to 'contains' thread, then to 'push' thread and then finally back to
259 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000260 """
261
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800262 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000263 self.digest = digest
264 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800265 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000266 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800268 def content(self):
269 """Iterable with content of this item as byte string (str) chunks."""
270 raise NotImplementedError()
271
272 def prepare(self, hash_algo):
273 """Ensures self.digest and self.size are set.
274
275 Uses content() as a source of data to calculate them. Does nothing if digest
276 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000277
278 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800279 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000280 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800281 if self.digest is None or self.size is None:
282 digest = hash_algo()
283 total = 0
284 for chunk in self.content():
285 digest.update(chunk)
286 total += len(chunk)
287 self.digest = digest.hexdigest()
288 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000289
290
291class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800292 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000293
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800294 Its digest and size may be provided in advance, if known. Otherwise they will
295 be derived from the file content.
296 """
297
298 def __init__(self, path, digest=None, size=None, high_priority=False):
299 super(FileItem, self).__init__(
300 digest,
301 size if size is not None else os.stat(path).st_size,
302 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000303 self.path = path
304 self.compression_level = get_zip_compression_level(path)
305
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800306 def content(self):
307 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000308
309
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000310class BufferItem(Item):
311 """A byte buffer to push to Storage."""
312
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800313 def __init__(self, buf, high_priority=False):
314 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000315 self.buffer = buf
316
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000318 return [self.buffer]
319
320
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000321class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800322 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000323
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 Implements compression support, parallel 'contains' checks, parallel uploads
325 and more.
326
327 Works only within single namespace (and thus hashing algorithm and compression
328 scheme are fixed).
329
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400330 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
331 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800332 """
333
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700334 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000335 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400336 self._use_zip = isolated_format.is_namespace_with_compression(
337 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400338 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000339 self._cpu_thread_pool = None
340 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400341 self._aborted = False
342 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000343
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000344 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700345 def hash_algo(self):
346 """Hashing algorithm used to name files in storage based on their content.
347
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400348 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700349 """
350 return self._hash_algo
351
352 @property
353 def location(self):
354 """Location of a backing store that this class is using.
355
356 Exact meaning depends on the storage_api type. For IsolateServer it is
357 an URL of isolate server, for FileSystem is it a path in file system.
358 """
359 return self._storage_api.location
360
361 @property
362 def namespace(self):
363 """Isolate namespace used by this storage.
364
365 Indirectly defines hashing scheme and compression method used.
366 """
367 return self._storage_api.namespace
368
369 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000370 def cpu_thread_pool(self):
371 """ThreadPool for CPU-bound tasks like zipping."""
372 if self._cpu_thread_pool is None:
373 self._cpu_thread_pool = threading_utils.ThreadPool(
374 2, max(threading_utils.num_processors(), 2), 0, 'zip')
375 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000376
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000377 @property
378 def net_thread_pool(self):
379 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
380 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700381 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000382 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000383
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000384 def close(self):
385 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400386 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 if self._cpu_thread_pool:
388 self._cpu_thread_pool.join()
389 self._cpu_thread_pool.close()
390 self._cpu_thread_pool = None
391 if self._net_thread_pool:
392 self._net_thread_pool.join()
393 self._net_thread_pool.close()
394 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400395 logging.info('Done.')
396
397 def abort(self):
398 """Cancels any pending or future operations."""
399 # This is not strictly theadsafe, but in the worst case the logging message
400 # will be printed twice. Not a big deal. In other places it is assumed that
401 # unprotected reads and writes to _aborted are serializable (it is true
402 # for python) and thus no locking is used.
403 if not self._aborted:
404 logging.warning('Aborting... It can take a while.')
405 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000406
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000407 def __enter__(self):
408 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400409 assert not self._prev_sig_handlers, self._prev_sig_handlers
410 for s in (signal.SIGINT, signal.SIGTERM):
411 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 return self
413
414 def __exit__(self, _exc_type, _exc_value, _traceback):
415 """Context manager interface."""
416 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 while self._prev_sig_handlers:
418 s, h = self._prev_sig_handlers.popitem()
419 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 return False
421
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000422 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800423 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000424
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800425 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000426
427 Arguments:
428 items: list of Item instances that represents data to upload.
429
430 Returns:
431 List of items that were uploaded. All other items are already there.
432 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700433 logging.info('upload_items(items=%d)', len(items))
434
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800435 # Ensure all digests are calculated.
436 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700437 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800438
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000439 # For each digest keep only first Item that matches it. All other items
440 # are just indistinguishable copies from the point of view of isolate
441 # server (it doesn't care about paths at all, only content and digests).
442 seen = {}
443 duplicates = 0
444 for item in items:
445 if seen.setdefault(item.digest, item) is not item:
446 duplicates += 1
447 items = seen.values()
448 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700449 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000450
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000451 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000452 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000453 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800454 channel = threading_utils.TaskChannel()
455 for missing_item, push_state in self.get_missing_items(items):
456 missing.add(missing_item)
457 self.async_push(channel, missing_item, push_state)
458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 # No need to spawn deadlock detector thread if there's nothing to upload.
460 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700461 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000463 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 detector.ping()
465 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000466 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000467 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000468 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000469 logging.info('All files are uploaded')
470
471 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000472 total = len(items)
473 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000474 logging.info(
475 'Total: %6d, %9.1fkb',
476 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000477 total_size / 1024.)
478 cache_hit = set(items) - missing
479 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000480 logging.info(
481 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
482 len(cache_hit),
483 cache_hit_size / 1024.,
484 len(cache_hit) * 100. / total,
485 cache_hit_size * 100. / total_size if total_size else 0)
486 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000487 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000488 logging.info(
489 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
490 len(cache_miss),
491 cache_miss_size / 1024.,
492 len(cache_miss) * 100. / total,
493 cache_miss_size * 100. / total_size if total_size else 0)
494
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000495 return uploaded
496
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800497 def get_fetch_url(self, item):
498 """Returns an URL that can be used to fetch given item once it's uploaded.
499
500 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800503 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000504
505 Returns:
506 An URL or None if underlying protocol doesn't support this.
507 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700508 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800509 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000510
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000512 """Starts asynchronous push to the server in a parallel thread.
513
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800514 Can be used only after |item| was checked for presence on a server with
515 'get_missing_items' call. 'get_missing_items' returns |push_state| object
516 that contains storage specific information describing how to upload
517 the item (for example in case of cloud storage, it is signed upload URLs).
518
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000519 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000520 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000521 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800522 push_state: push state returned by 'get_missing_items' call for |item|.
523
524 Returns:
525 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000526 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800527 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400528 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700529 threading_utils.PRIORITY_HIGH if item.high_priority
530 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800531
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000532 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400533 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400534 if self._aborted:
535 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700536 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800537 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000538 return item
539
540 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700541 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800542 self.net_thread_pool.add_task_with_channel(
543 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 return
545
546 # If zipping is enabled, zip in a separate thread.
547 def zip_and_push():
548 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
549 # content right here. It will block until all file is zipped.
550 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400551 if self._aborted:
552 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800553 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 data = ''.join(stream)
555 except Exception as exc:
556 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800557 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000558 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000559 self.net_thread_pool.add_task_with_channel(
560 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000561 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000562
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800563 def push(self, item, push_state):
564 """Synchronously pushes a single item to the server.
565
566 If you need to push many items at once, consider using 'upload_items' or
567 'async_push' with instance of TaskChannel.
568
569 Arguments:
570 item: item to upload as instance of Item class.
571 push_state: push state returned by 'get_missing_items' call for |item|.
572
573 Returns:
574 Pushed item (same object as |item|).
575 """
576 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700577 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800578 self.async_push(channel, item, push_state)
579 pushed = channel.pull()
580 assert pushed is item
581 return item
582
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000583 def async_fetch(self, channel, priority, digest, size, sink):
584 """Starts asynchronous fetch from the server in a parallel thread.
585
586 Arguments:
587 channel: TaskChannel that receives back |digest| when download ends.
588 priority: thread pool task priority for the fetch.
589 digest: hex digest of an item to download.
590 size: expected size of the item (after decompression).
591 sink: function that will be called as sink(generator).
592 """
593 def fetch():
594 try:
595 # Prepare reading pipeline.
596 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400598 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000599 # Run |stream| through verifier that will assert its size.
600 verifier = FetchStreamVerifier(stream, size)
601 # Verified stream goes to |sink|.
602 sink(verifier.run())
603 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800604 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000605 raise
606 return digest
607
608 # Don't bother with zip_thread_pool for decompression. Decompression is
609 # really fast and most probably IO bound anyway.
610 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
611
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000612 def get_missing_items(self, items):
613 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000614
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
617 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000618 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000619
620 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800621 For each missing item it yields a pair (item, push_state), where:
622 * item - Item object that is missing (one of |items|).
623 * push_state - opaque object that contains storage specific information
624 describing how to upload the item (for example in case of cloud
625 storage, it is signed upload URLs). It can later be passed to
626 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000627 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000628 channel = threading_utils.TaskChannel()
629 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800630
631 # Ensure all digests are calculated.
632 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700633 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800634
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400635 def contains(batch):
636 if self._aborted:
637 raise Aborted()
638 return self._storage_api.contains(batch)
639
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000640 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800641 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400642 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400643 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000644 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800645
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000646 # Yield results as they come in.
647 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800648 for missing_item, push_state in channel.pull().iteritems():
649 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000650
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000651
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800652def batch_items_for_check(items):
653 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000654
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800655 Each batch corresponds to a single 'exists?' query to the server via a call
656 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000657
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800658 Arguments:
659 items: a list of Item objects.
660
661 Yields:
662 Batches of items to query for existence in a single operation,
663 each batch is a list of Item objects.
664 """
665 batch_count = 0
666 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
667 next_queries = []
668 for item in sorted(items, key=lambda x: x.size, reverse=True):
669 next_queries.append(item)
670 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000671 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800672 next_queries = []
673 batch_count += 1
674 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
675 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
676 if next_queries:
677 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000678
679
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000680class FetchQueue(object):
681 """Fetches items from Storage and places them into LocalCache.
682
683 It manages multiple concurrent fetch operations. Acts as a bridge between
684 Storage and LocalCache so that Storage and LocalCache don't depend on each
685 other at all.
686 """
687
688 def __init__(self, storage, cache):
689 self.storage = storage
690 self.cache = cache
691 self._channel = threading_utils.TaskChannel()
692 self._pending = set()
693 self._accessed = set()
694 self._fetched = cache.cached_set()
695
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400696 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700697 self,
698 digest,
699 size=UNKNOWN_FILE_SIZE,
700 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000701 """Starts asynchronous fetch of item |digest|."""
702 # Fetching it now?
703 if digest in self._pending:
704 return
705
706 # Mark this file as in use, verify_all_cached will later ensure it is still
707 # in cache.
708 self._accessed.add(digest)
709
710 # Already fetched? Notify cache to update item's LRU position.
711 if digest in self._fetched:
712 # 'touch' returns True if item is in cache and not corrupted.
713 if self.cache.touch(digest, size):
714 return
715 # Item is corrupted, remove it from cache and fetch it again.
716 self._fetched.remove(digest)
717 self.cache.evict(digest)
718
719 # TODO(maruel): It should look at the free disk space, the current cache
720 # size and the size of the new item on every new item:
721 # - Trim the cache as more entries are listed when free disk space is low,
722 # otherwise if the amount of data downloaded during the run > free disk
723 # space, it'll crash.
724 # - Make sure there's enough free disk space to fit all dependencies of
725 # this run! If not, abort early.
726
727 # Start fetching.
728 self._pending.add(digest)
729 self.storage.async_fetch(
730 self._channel, priority, digest, size,
731 functools.partial(self.cache.write, digest))
732
733 def wait(self, digests):
734 """Starts a loop that waits for at least one of |digests| to be retrieved.
735
736 Returns the first digest retrieved.
737 """
738 # Flush any already fetched items.
739 for digest in digests:
740 if digest in self._fetched:
741 return digest
742
743 # Ensure all requested items are being fetched now.
744 assert all(digest in self._pending for digest in digests), (
745 digests, self._pending)
746
747 # Wait for some requested item to finish fetching.
748 while self._pending:
749 digest = self._channel.pull()
750 self._pending.remove(digest)
751 self._fetched.add(digest)
752 if digest in digests:
753 return digest
754
755 # Should never reach this point due to assert above.
756 raise RuntimeError('Impossible state')
757
758 def inject_local_file(self, path, algo):
759 """Adds local file to the cache as if it was fetched from storage."""
760 with open(path, 'rb') as f:
761 data = f.read()
762 digest = algo(data).hexdigest()
763 self.cache.write(digest, [data])
764 self._fetched.add(digest)
765 return digest
766
767 @property
768 def pending_count(self):
769 """Returns number of items to be fetched."""
770 return len(self._pending)
771
772 def verify_all_cached(self):
773 """True if all accessed items are in cache."""
774 return self._accessed.issubset(self.cache.cached_set())
775
776
777class FetchStreamVerifier(object):
778 """Verifies that fetched file is valid before passing it to the LocalCache."""
779
780 def __init__(self, stream, expected_size):
781 self.stream = stream
782 self.expected_size = expected_size
783 self.current_size = 0
784
785 def run(self):
786 """Generator that yields same items as |stream|.
787
788 Verifies |stream| is complete before yielding a last chunk to consumer.
789
790 Also wraps IOError produced by consumer into MappingError exceptions since
791 otherwise Storage will retry fetch on unrelated local cache errors.
792 """
793 # Read one chunk ahead, keep it in |stored|.
794 # That way a complete stream can be verified before pushing last chunk
795 # to consumer.
796 stored = None
797 for chunk in self.stream:
798 assert chunk is not None
799 if stored is not None:
800 self._inspect_chunk(stored, is_last=False)
801 try:
802 yield stored
803 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400804 raise isolated_format.MappingError(
805 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000806 stored = chunk
807 if stored is not None:
808 self._inspect_chunk(stored, is_last=True)
809 try:
810 yield stored
811 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400812 raise isolated_format.MappingError(
813 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000814
815 def _inspect_chunk(self, chunk, is_last):
816 """Called for each fetched chunk before passing it to consumer."""
817 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400818 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700819 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000820 (self.expected_size != self.current_size)):
821 raise IOError('Incorrect file size: expected %d, got %d' % (
822 self.expected_size, self.current_size))
823
824
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000825class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800826 """Interface for classes that implement low-level storage operations.
827
828 StorageApi is oblivious of compression and hashing scheme used. This details
829 are handled in higher level Storage class.
830
831 Clients should generally not use StorageApi directly. Storage class is
832 preferred since it implements compression and upload optimizations.
833 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000834
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700835 @property
836 def location(self):
837 """Location of a backing store that this class is using.
838
839 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
840 server, for FileSystem is it a path in file system.
841 """
842 raise NotImplementedError()
843
844 @property
845 def namespace(self):
846 """Isolate namespace used by this storage.
847
848 Indirectly defines hashing scheme and compression method used.
849 """
850 raise NotImplementedError()
851
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000852 def get_fetch_url(self, digest):
853 """Returns an URL that can be used to fetch an item with given digest.
854
855 Arguments:
856 digest: hex digest of item to fetch.
857
858 Returns:
859 An URL or None if the protocol doesn't support this.
860 """
861 raise NotImplementedError()
862
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800863 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000864 """Fetches an object and yields its content.
865
866 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000867 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800868 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000869
870 Yields:
871 Chunks of downloaded item (as str objects).
872 """
873 raise NotImplementedError()
874
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800875 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000876 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000877
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800878 |item| MUST go through 'contains' call to get |push_state| before it can
879 be pushed to the storage.
880
881 To be clear, here is one possible usage:
882 all_items = [... all items to push as Item subclasses ...]
883 for missing_item, push_state in storage_api.contains(all_items).items():
884 storage_api.push(missing_item, push_state)
885
886 When pushing to a namespace with compression, data that should be pushed
887 and data provided by the item is not the same. In that case |content| is
888 not None and it yields chunks of compressed data (using item.content() as
889 a source of original uncompressed data). This is implemented by Storage
890 class.
891
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000892 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000893 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800894 push_state: push state object as returned by 'contains' call.
895 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000896
897 Returns:
898 None.
899 """
900 raise NotImplementedError()
901
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000902 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800903 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000904
905 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800906 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000907
908 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800909 A dict missing Item -> opaque push state object to be passed to 'push'.
910 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000911 """
912 raise NotImplementedError()
913
914
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800915class _IsolateServerPushState(object):
916 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500917
918 Note this needs to be a global class to support pickling.
919 """
920
921 def __init__(self, upload_url, finalize_url):
922 self.upload_url = upload_url
923 self.finalize_url = finalize_url
924 self.uploaded = False
925 self.finalized = False
926
927
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000928class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000929 """StorageApi implementation that downloads and uploads to Isolate Server.
930
931 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800932 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000933 """
934
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000935 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000936 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000937 assert base_url.startswith('http'), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700938 self._base_url = base_url.rstrip('/')
939 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000940 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000941 self._server_caps = None
942
943 @staticmethod
944 def _generate_handshake_request():
945 """Returns a dict to be sent as handshake request body."""
946 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
947 return {
948 'client_app_version': __version__,
949 'fetcher': True,
950 'protocol_version': ISOLATE_PROTOCOL_VERSION,
951 'pusher': True,
952 }
953
954 @staticmethod
955 def _validate_handshake_response(caps):
956 """Validates and normalizes handshake response."""
957 logging.info('Protocol version: %s', caps['protocol_version'])
958 logging.info('Server version: %s', caps['server_app_version'])
959 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400960 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000961 if not caps['access_token']:
962 raise ValueError('access_token is missing')
963 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000964
965 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000966 def _server_capabilities(self):
967 """Performs handshake with the server if not yet done.
968
969 Returns:
970 Server capabilities dictionary as returned by /handshake endpoint.
971
972 Raises:
973 MappingError if server rejects the handshake.
974 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000975 # TODO(maruel): Make this request much earlier asynchronously while the
976 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800977
978 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
979 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000980 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000981 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000982 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400983 caps = net.url_read_json(
984 url=self._base_url + '/content-gs/handshake',
985 data=self._generate_handshake_request())
986 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400987 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000988 if not isinstance(caps, dict):
989 raise ValueError('Expecting JSON dict')
990 self._server_caps = self._validate_handshake_response(caps)
991 except (ValueError, KeyError, TypeError) as exc:
992 # KeyError exception has very confusing str conversion: it's just a
993 # missing key value and nothing else. So print exception class name
994 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400995 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400996 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000997 exc.__class__.__name__, exc))
998 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000999
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001000 @property
1001 def location(self):
1002 return self._base_url
1003
1004 @property
1005 def namespace(self):
1006 return self._namespace
1007
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001008 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001009 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001010 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001011 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001012
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001013 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001014 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001015 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001016
Vadim Shtayura8623c272014-12-01 11:45:27 -08001017 connection = self.do_fetch(source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001018 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001019 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001020
1021 # If |offset| is used, verify server respects it by checking Content-Range.
1022 if offset:
1023 content_range = connection.get_header('Content-Range')
1024 if not content_range:
1025 raise IOError('Missing Content-Range header')
1026
1027 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1028 # According to a spec, <size> can be '*' meaning "Total size of the file
1029 # is not known in advance".
1030 try:
1031 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1032 if not match:
1033 raise ValueError()
1034 content_offset = int(match.group(1))
1035 last_byte_index = int(match.group(2))
1036 size = None if match.group(3) == '*' else int(match.group(3))
1037 except ValueError:
1038 raise IOError('Invalid Content-Range header: %s' % content_range)
1039
1040 # Ensure returned offset equals requested one.
1041 if offset != content_offset:
1042 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1043 offset, content_offset, content_range))
1044
1045 # Ensure entire tail of the file is returned.
1046 if size is not None and last_byte_index + 1 != size:
1047 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1048
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001049 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001050
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001051 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001052 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001053 assert item.digest is not None
1054 assert item.size is not None
1055 assert isinstance(push_state, _IsolateServerPushState)
1056 assert not push_state.finalized
1057
1058 # Default to item.content().
1059 content = item.content() if content is None else content
1060
1061 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1062 if isinstance(content, basestring):
1063 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1064 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001065
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001066 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1067 # If |content| is indeed a generator, it can not be re-winded back
1068 # to the beginning of the stream. A retry will find it exhausted. A possible
1069 # solution is to wrap |content| generator with some sort of caching
1070 # restartable generator. It should be done alongside streaming support
1071 # implementation.
1072
1073 # This push operation may be a retry after failed finalization call below,
1074 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001075 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001076 # PUT file to |upload_url|.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001077 success = self.do_push(push_state.upload_url, content)
1078 if not success:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001079 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001080 item.digest, push_state.upload_url))
1081 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001082 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001083 logging.info(
1084 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001085
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001086 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001087 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001088 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1089 # send it to isolated server. That way isolate server can verify that
1090 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1091 # stored files).
Vadim Shtayura8623c272014-12-01 11:45:27 -08001092 # TODO(maruel): Fix the server to accept properly data={} so
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001093 # url_read_json() can be used.
1094 response = net.url_read(
1095 url=push_state.finalize_url,
1096 data='',
1097 content_type='application/json',
1098 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001099 if response is None:
1100 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001101 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001102
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001103 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001104 # Ensure all items were initialized with 'prepare' call. Storage does that.
1105 assert all(i.digest is not None and i.size is not None for i in items)
1106
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001107 # Request body is a json encoded list of dicts.
1108 body = [
1109 {
1110 'h': item.digest,
1111 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001112 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001113 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001114 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001115
1116 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001117 self._base_url,
1118 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001119 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001120
1121 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001122 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001123 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001124 response = net.url_read_json(url=query_url, data=body)
1125 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001126 raise isolated_format.MappingError(
1127 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001128 if not isinstance(response, list):
1129 raise ValueError('Expecting response with json-encoded list')
1130 if len(response) != len(items):
1131 raise ValueError(
1132 'Incorrect number of items in the list, expected %d, '
1133 'but got %d' % (len(items), len(response)))
1134 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001135 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001136 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001137
1138 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001139 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001140 for i, push_urls in enumerate(response):
1141 if push_urls:
1142 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001143 missing_items[items[i]] = _IsolateServerPushState(
1144 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001145 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001146 len(items), len(items) - len(missing_items))
1147 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001148
Vadim Shtayura8623c272014-12-01 11:45:27 -08001149 def do_fetch(self, url, offset):
1150 """Fetches isolated data from the URL.
1151
1152 Used only for fetching files, not for API calls. Can be overridden in
1153 subclasses.
1154
1155 Args:
1156 url: URL to fetch the data from, can possibly return http redirect.
1157 offset: byte offset inside the file to start fetching from.
1158
1159 Returns:
1160 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1161 """
1162 return net.url_open(
1163 url,
1164 read_timeout=DOWNLOAD_READ_TIMEOUT,
1165 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1166
1167 def do_push(self, url, content):
1168 """Uploads isolated file to the URL.
1169
1170 Used only for storing files, not for API calls. Can be overridden in
1171 subclasses.
1172
1173 Args:
1174 url: URL to upload the data to.
1175 content: an iterable that yields 'str' chunks.
1176
1177 Returns:
1178 True on success, False on failure.
1179 """
1180 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1181 # upload support is implemented.
1182 if isinstance(content, list) and len(content) == 1:
1183 content = content[0]
1184 else:
1185 content = ''.join(content)
1186 response = net.url_read(
1187 url=url,
1188 data=content,
1189 content_type='application/octet-stream',
1190 method='PUT')
1191 return response is not None
1192
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001193
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001194class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001195 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001196
1197 The common use case is a NFS/CIFS file server that is mounted locally that is
1198 used to fetch the file on a local partition.
1199 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001200
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001201 # Used for push_state instead of None. That way caller is forced to
1202 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1203 _DUMMY_PUSH_STATE = object()
1204
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001205 def __init__(self, base_path, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001206 super(FileSystem, self).__init__()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001207 self._base_path = base_path
1208 self._namespace = namespace
1209
1210 @property
1211 def location(self):
1212 return self._base_path
1213
1214 @property
1215 def namespace(self):
1216 return self._namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001217
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001218 def get_fetch_url(self, digest):
1219 return None
1220
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001221 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001222 assert isinstance(digest, basestring)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001223 return file_read(os.path.join(self._base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001224
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001225 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001226 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001227 assert item.digest is not None
1228 assert item.size is not None
1229 assert push_state is self._DUMMY_PUSH_STATE
1230 content = item.content() if content is None else content
1231 if isinstance(content, basestring):
1232 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1233 content = [content]
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001234 file_write(os.path.join(self._base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001235
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001236 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001237 assert all(i.digest is not None and i.size is not None for i in items)
1238 return dict(
1239 (item, self._DUMMY_PUSH_STATE) for item in items
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001240 if not os.path.exists(os.path.join(self._base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001241 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001242
1243
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001244class LocalCache(object):
1245 """Local cache that stores objects fetched via Storage.
1246
1247 It can be accessed concurrently from multiple threads, so it should protect
1248 its internal state with some lock.
1249 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001250 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001251
1252 def __enter__(self):
1253 """Context manager interface."""
1254 return self
1255
1256 def __exit__(self, _exc_type, _exec_value, _traceback):
1257 """Context manager interface."""
1258 return False
1259
1260 def cached_set(self):
1261 """Returns a set of all cached digests (always a new object)."""
1262 raise NotImplementedError()
1263
1264 def touch(self, digest, size):
1265 """Ensures item is not corrupted and updates its LRU position.
1266
1267 Arguments:
1268 digest: hash digest of item to check.
1269 size: expected size of this item.
1270
1271 Returns:
1272 True if item is in cache and not corrupted.
1273 """
1274 raise NotImplementedError()
1275
1276 def evict(self, digest):
1277 """Removes item from cache if it's there."""
1278 raise NotImplementedError()
1279
1280 def read(self, digest):
1281 """Returns contents of the cached item as a single str."""
1282 raise NotImplementedError()
1283
1284 def write(self, digest, content):
1285 """Reads data from |content| generator and stores it in cache."""
1286 raise NotImplementedError()
1287
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001288 def hardlink(self, digest, dest, file_mode):
1289 """Ensures file at |dest| has same content as cached |digest|.
1290
1291 If file_mode is provided, it is used to set the executable bit if
1292 applicable.
1293 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001294 raise NotImplementedError()
1295
1296
1297class MemoryCache(LocalCache):
1298 """LocalCache implementation that stores everything in memory."""
1299
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001300 def __init__(self, file_mode_mask=0500):
1301 """Args:
1302 file_mode_mask: bit mask to AND file mode with. Default value will make
1303 all mapped files to be read only.
1304 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001305 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001306 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001307 # Let's not assume dict is thread safe.
1308 self._lock = threading.Lock()
1309 self._contents = {}
1310
1311 def cached_set(self):
1312 with self._lock:
1313 return set(self._contents)
1314
1315 def touch(self, digest, size):
1316 with self._lock:
1317 return digest in self._contents
1318
1319 def evict(self, digest):
1320 with self._lock:
1321 self._contents.pop(digest, None)
1322
1323 def read(self, digest):
1324 with self._lock:
1325 return self._contents[digest]
1326
1327 def write(self, digest, content):
1328 # Assemble whole stream before taking the lock.
1329 data = ''.join(content)
1330 with self._lock:
1331 self._contents[digest] = data
1332
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001333 def hardlink(self, digest, dest, file_mode):
1334 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001335 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001336 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001337 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001338
1339
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001340class CachePolicies(object):
1341 def __init__(self, max_cache_size, min_free_space, max_items):
1342 """
1343 Arguments:
1344 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1345 cache is effectively a leak.
1346 - min_free_space: Trim if disk free space becomes lower than this value. If
1347 0, it unconditionally fill the disk.
1348 - max_items: Maximum number of items to keep in the cache. If 0, do not
1349 enforce a limit.
1350 """
1351 self.max_cache_size = max_cache_size
1352 self.min_free_space = min_free_space
1353 self.max_items = max_items
1354
1355
1356class DiskCache(LocalCache):
1357 """Stateful LRU cache in a flat hash table in a directory.
1358
1359 Saves its state as json file.
1360 """
1361 STATE_FILE = 'state.json'
1362
1363 def __init__(self, cache_dir, policies, hash_algo):
1364 """
1365 Arguments:
1366 cache_dir: directory where to place the cache.
1367 policies: cache retention policies.
1368 algo: hashing algorithm used.
1369 """
1370 super(DiskCache, self).__init__()
1371 self.cache_dir = cache_dir
1372 self.policies = policies
1373 self.hash_algo = hash_algo
1374 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1375
1376 # All protected methods (starting with '_') except _path should be called
1377 # with this lock locked.
1378 self._lock = threading_utils.LockWithAssert()
1379 self._lru = lru.LRUDict()
1380
1381 # Profiling values.
1382 self._added = []
1383 self._removed = []
1384 self._free_disk = 0
1385
1386 with tools.Profiler('Setup'):
1387 with self._lock:
1388 self._load()
1389
1390 def __enter__(self):
1391 return self
1392
1393 def __exit__(self, _exc_type, _exec_value, _traceback):
1394 with tools.Profiler('CleanupTrimming'):
1395 with self._lock:
1396 self._trim()
1397
1398 logging.info(
1399 '%5d (%8dkb) added',
1400 len(self._added), sum(self._added) / 1024)
1401 logging.info(
1402 '%5d (%8dkb) current',
1403 len(self._lru),
1404 sum(self._lru.itervalues()) / 1024)
1405 logging.info(
1406 '%5d (%8dkb) removed',
1407 len(self._removed), sum(self._removed) / 1024)
1408 logging.info(
1409 ' %8dkb free',
1410 self._free_disk / 1024)
1411 return False
1412
1413 def cached_set(self):
1414 with self._lock:
1415 return self._lru.keys_set()
1416
1417 def touch(self, digest, size):
1418 """Verifies an actual file is valid.
1419
1420 Note that is doesn't compute the hash so it could still be corrupted if the
1421 file size didn't change.
1422
1423 TODO(maruel): More stringent verification while keeping the check fast.
1424 """
1425 # Do the check outside the lock.
1426 if not is_valid_file(self._path(digest), size):
1427 return False
1428
1429 # Update it's LRU position.
1430 with self._lock:
1431 if digest not in self._lru:
1432 return False
1433 self._lru.touch(digest)
1434 return True
1435
1436 def evict(self, digest):
1437 with self._lock:
1438 self._lru.pop(digest)
1439 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1440
1441 def read(self, digest):
1442 with open(self._path(digest), 'rb') as f:
1443 return f.read()
1444
1445 def write(self, digest, content):
1446 path = self._path(digest)
1447 # A stale broken file may remain. It is possible for the file to have write
1448 # access bit removed which would cause the file_write() call to fail to open
1449 # in write mode. Take no chance here.
1450 file_path.try_remove(path)
1451 try:
1452 size = file_write(path, content)
1453 except:
1454 # There are two possible places were an exception can occur:
1455 # 1) Inside |content| generator in case of network or unzipping errors.
1456 # 2) Inside file_write itself in case of disk IO errors.
1457 # In any case delete an incomplete file and propagate the exception to
1458 # caller, it will be logged there.
1459 file_path.try_remove(path)
1460 raise
1461 # Make the file read-only in the cache. This has a few side-effects since
1462 # the file node is modified, so every directory entries to this file becomes
1463 # read-only. It's fine here because it is a new file.
1464 file_path.set_read_only(path, True)
1465 with self._lock:
1466 self._add(digest, size)
1467
1468 def hardlink(self, digest, dest, file_mode):
1469 """Hardlinks the file to |dest|.
1470
1471 Note that the file permission bits are on the file node, not the directory
1472 entry, so changing the access bit on any of the directory entries for the
1473 file node will affect them all.
1474 """
1475 path = self._path(digest)
1476 # TODO(maruel): file_path.HARDLINK_WITH_FALLBACK ?
1477 file_path.hardlink(path, dest)
1478 if file_mode is not None:
1479 # Ignores all other bits.
1480 os.chmod(dest, file_mode & 0500)
1481
1482 def _load(self):
1483 """Loads state of the cache from json file."""
1484 self._lock.assert_locked()
1485
1486 if not os.path.isdir(self.cache_dir):
1487 os.makedirs(self.cache_dir)
1488 else:
1489 # Make sure the cache is read-only.
1490 # TODO(maruel): Calculate the cost and optimize the performance
1491 # accordingly.
1492 file_path.make_tree_read_only(self.cache_dir)
1493
1494 # Load state of the cache.
1495 if os.path.isfile(self.state_file):
1496 try:
1497 self._lru = lru.LRUDict.load(self.state_file)
1498 except ValueError as err:
1499 logging.error('Failed to load cache state: %s' % (err,))
1500 # Don't want to keep broken state file.
1501 file_path.try_remove(self.state_file)
1502
1503 # Ensure that all files listed in the state still exist and add new ones.
1504 previous = self._lru.keys_set()
1505 unknown = []
1506 for filename in os.listdir(self.cache_dir):
1507 if filename == self.STATE_FILE:
1508 continue
1509 if filename in previous:
1510 previous.remove(filename)
1511 continue
1512 # An untracked file.
1513 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1514 logging.warning('Removing unknown file %s from cache', filename)
1515 file_path.try_remove(self._path(filename))
1516 continue
1517 # File that's not referenced in 'state.json'.
1518 # TODO(vadimsh): Verify its SHA1 matches file name.
1519 logging.warning('Adding unknown file %s to cache', filename)
1520 unknown.append(filename)
1521
1522 if unknown:
1523 # Add as oldest files. They will be deleted eventually if not accessed.
1524 self._add_oldest_list(unknown)
1525 logging.warning('Added back %d unknown files', len(unknown))
1526
1527 if previous:
1528 # Filter out entries that were not found.
1529 logging.warning('Removed %d lost files', len(previous))
1530 for filename in previous:
1531 self._lru.pop(filename)
1532 self._trim()
1533
1534 def _save(self):
1535 """Saves the LRU ordering."""
1536 self._lock.assert_locked()
1537 if sys.platform != 'win32':
1538 d = os.path.dirname(self.state_file)
1539 if os.path.isdir(d):
1540 # Necessary otherwise the file can't be created.
1541 file_path.set_read_only(d, False)
1542 if os.path.isfile(self.state_file):
1543 file_path.set_read_only(self.state_file, False)
1544 self._lru.save(self.state_file)
1545
1546 def _trim(self):
1547 """Trims anything we don't know, make sure enough free space exists."""
1548 self._lock.assert_locked()
1549
1550 # Ensure maximum cache size.
1551 if self.policies.max_cache_size:
1552 total_size = sum(self._lru.itervalues())
1553 while total_size > self.policies.max_cache_size:
1554 total_size -= self._remove_lru_file()
1555
1556 # Ensure maximum number of items in the cache.
1557 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1558 for _ in xrange(len(self._lru) - self.policies.max_items):
1559 self._remove_lru_file()
1560
1561 # Ensure enough free space.
1562 self._free_disk = file_path.get_free_space(self.cache_dir)
1563 trimmed_due_to_space = False
1564 while (
1565 self.policies.min_free_space and
1566 self._lru and
1567 self._free_disk < self.policies.min_free_space):
1568 trimmed_due_to_space = True
1569 self._remove_lru_file()
1570 self._free_disk = file_path.get_free_space(self.cache_dir)
1571 if trimmed_due_to_space:
1572 total_usage = sum(self._lru.itervalues())
1573 usage_percent = 0.
1574 if total_usage:
1575 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1576 logging.warning(
1577 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1578 'cache (%.1f%% of its maximum capacity)',
1579 self._free_disk / 1024.,
1580 total_usage / 1024.,
1581 usage_percent)
1582 self._save()
1583
1584 def _path(self, digest):
1585 """Returns the path to one item."""
1586 return os.path.join(self.cache_dir, digest)
1587
1588 def _remove_lru_file(self):
1589 """Removes the last recently used file and returns its size."""
1590 self._lock.assert_locked()
1591 digest, size = self._lru.pop_oldest()
1592 self._delete_file(digest, size)
1593 return size
1594
1595 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1596 """Adds an item into LRU cache marking it as a newest one."""
1597 self._lock.assert_locked()
1598 if size == UNKNOWN_FILE_SIZE:
1599 size = os.stat(self._path(digest)).st_size
1600 self._added.append(size)
1601 self._lru.add(digest, size)
1602
1603 def _add_oldest_list(self, digests):
1604 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1605 self._lock.assert_locked()
1606 pairs = []
1607 for digest in digests:
1608 size = os.stat(self._path(digest)).st_size
1609 self._added.append(size)
1610 pairs.append((digest, size))
1611 self._lru.batch_insert_oldest(pairs)
1612
1613 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1614 """Deletes cache file from the file system."""
1615 self._lock.assert_locked()
1616 try:
1617 if size == UNKNOWN_FILE_SIZE:
1618 size = os.stat(self._path(digest)).st_size
1619 file_path.try_remove(self._path(digest))
1620 self._removed.append(size)
1621 except OSError as e:
1622 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1623
1624
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001625class IsolatedBundle(object):
1626 """Fetched and parsed .isolated file with all dependencies."""
1627
Vadim Shtayura3148e072014-09-02 18:51:52 -07001628 def __init__(self):
1629 self.command = []
1630 self.files = {}
1631 self.read_only = None
1632 self.relative_cwd = None
1633 # The main .isolated file, a IsolatedFile instance.
1634 self.root = None
1635
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001636 def fetch(self, fetch_queue, root_isolated_hash, algo):
1637 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001638
1639 It enables support for "included" .isolated files. They are processed in
1640 strict order but fetched asynchronously from the cache. This is important so
1641 that a file in an included .isolated file that is overridden by an embedding
1642 .isolated file is not fetched needlessly. The includes are fetched in one
1643 pass and the files are fetched as soon as all the ones on the left-side
1644 of the tree were fetched.
1645
1646 The prioritization is very important here for nested .isolated files.
1647 'includes' have the highest priority and the algorithm is optimized for both
1648 deep and wide trees. A deep one is a long link of .isolated files referenced
1649 one at a time by one item in 'includes'. A wide one has a large number of
1650 'includes' in a single .isolated file. 'left' is defined as an included
1651 .isolated file earlier in the 'includes' list. So the order of the elements
1652 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001653
1654 As a side effect this method starts asynchronous fetch of all data files
1655 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1656 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001657 """
1658 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1659
1660 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1661 pending = {}
1662 # Set of hashes of already retrieved items to refuse recursive includes.
1663 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001664 # Set of IsolatedFile's whose data files have already being fetched.
1665 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001666
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001667 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001668 h = isolated_file.obj_hash
1669 if h in seen:
1670 raise isolated_format.IsolatedError(
1671 'IsolatedFile %s is retrieved recursively' % h)
1672 assert h not in pending
1673 seen.add(h)
1674 pending[h] = isolated_file
1675 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1676
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001677 # Start fetching root *.isolated file (single file, not the whole bundle).
1678 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001679
1680 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001681 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001682 item_hash = fetch_queue.wait(pending)
1683 item = pending.pop(item_hash)
1684 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001685
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001686 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001687 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001688 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001689
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001690 # Always fetch *.isolated files in traversal order, waiting if necessary
1691 # until next to-be-processed node loads. "Waiting" is done by yielding
1692 # back to the outer loop, that waits until some *.isolated is loaded.
1693 for node in isolated_format.walk_includes(self.root):
1694 if node not in processed:
1695 # Not visited, and not yet loaded -> wait for it to load.
1696 if not node.is_loaded:
1697 break
1698 # Not visited and loaded -> process it and continue the traversal.
1699 self._start_fetching_files(node, fetch_queue)
1700 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001701
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001702 # All *.isolated files should be processed by now and only them.
1703 all_isolateds = set(isolated_format.walk_includes(self.root))
1704 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001705
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001706 # Extract 'command' and other bundle properties.
1707 for node in isolated_format.walk_includes(self.root):
1708 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001709 self.relative_cwd = self.relative_cwd or ''
1710
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001711 def _start_fetching_files(self, isolated, fetch_queue):
1712 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001713
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001714 Modifies self.files.
1715 """
1716 logging.debug('fetch_files(%s)', isolated.obj_hash)
1717 for filepath, properties in isolated.data.get('files', {}).iteritems():
1718 # Root isolated has priority on the files being mapped. In particular,
1719 # overridden files must not be fetched.
1720 if filepath not in self.files:
1721 self.files[filepath] = properties
1722 if 'h' in properties:
1723 # Preemptively request files.
1724 logging.debug('fetching %s', filepath)
1725 fetch_queue.add(
1726 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1727
1728 def _update_self(self, node):
1729 """Extracts bundle global parameters from loaded *.isolated file.
1730
1731 Will be called with each loaded *.isolated file in order of traversal of
1732 isolated include graph (see isolated_format.walk_includes).
1733 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001734 # Grabs properties.
1735 if not self.command and node.data.get('command'):
1736 # Ensure paths are correctly separated on windows.
1737 self.command = node.data['command']
1738 if self.command:
1739 self.command[0] = self.command[0].replace('/', os.path.sep)
1740 self.command = tools.fix_python_path(self.command)
1741 if self.read_only is None and node.data.get('read_only') is not None:
1742 self.read_only = node.data['read_only']
1743 if (self.relative_cwd is None and
1744 node.data.get('relative_cwd') is not None):
1745 self.relative_cwd = node.data['relative_cwd']
1746
1747
Vadim Shtayura8623c272014-12-01 11:45:27 -08001748def set_storage_api_class(cls):
1749 """Replaces StorageApi implementation used by default."""
1750 global _storage_api_cls
1751 assert _storage_api_cls is None
1752 assert issubclass(cls, StorageApi)
1753 _storage_api_cls = cls
1754
1755
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001756def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001757 """Returns an object that implements low-level StorageApi interface.
1758
1759 It is used by Storage to work with single isolate |namespace|. It should
1760 rarely be used directly by clients, see 'get_storage' for
1761 a better alternative.
1762
1763 Arguments:
1764 file_or_url: a file path to use file system based storage, or URL of isolate
1765 service to use shared cloud based storage.
1766 namespace: isolate namespace to operate in, also defines hashing and
1767 compression scheme used, i.e. namespace names that end with '-gzip'
1768 store compressed data.
1769
1770 Returns:
1771 Instance of StorageApi subclass.
1772 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001773 if file_path.is_url(file_or_url):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001774 cls = _storage_api_cls or IsolateServer
1775 return cls(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001776 else:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001777 return FileSystem(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001778
1779
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001780def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001781 """Returns Storage class that can upload and download from |namespace|.
1782
1783 Arguments:
1784 file_or_url: a file path to use file system based storage, or URL of isolate
1785 service to use shared cloud based storage.
1786 namespace: isolate namespace to operate in, also defines hashing and
1787 compression scheme used, i.e. namespace names that end with '-gzip'
1788 store compressed data.
1789
1790 Returns:
1791 Instance of Storage.
1792 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001793 return Storage(get_storage_api(file_or_url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001794
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001795
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001796def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001797 """Uploads the given tree to the given url.
1798
1799 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001800 base_url: The url of the isolate server to upload to.
1801 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001802 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001803 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001804 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001805 # Filter out symlinks, since they are not represented by items on isolate
1806 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001807 items = []
1808 seen = set()
1809 skipped = 0
1810 for filepath, metadata in infiles:
1811 if 'l' not in metadata and filepath not in seen:
1812 seen.add(filepath)
1813 item = FileItem(
1814 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001815 digest=metadata['h'],
1816 size=metadata['s'],
1817 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001818 items.append(item)
1819 else:
1820 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001821
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001822 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001823 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001824 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001825
1826
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001827def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001828 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001829
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001830 Arguments:
1831 isolated_hash: hash of the root *.isolated file.
1832 storage: Storage class that communicates with isolate storage.
1833 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001834 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001835 require_command: Ensure *.isolated specifies a command to run.
1836
1837 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001838 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001839 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001840 logging.debug(
1841 'fetch_isolated(%s, %s, %s, %s, %s)',
1842 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001843 # Hash algorithm to use, defined by namespace |storage| is using.
1844 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001845 with cache:
1846 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001847 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001848
1849 with tools.Profiler('GetIsolateds'):
1850 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001851 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001852 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1853 try:
1854 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1855 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001856 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001857 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1858 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001859
1860 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001861 bundle.fetch(fetch_queue, isolated_hash, algo)
1862 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001863 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1864 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001865 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001866
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001867 with tools.Profiler('GetRest'):
1868 # Create file system hierarchy.
1869 if not os.path.isdir(outdir):
1870 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001871 create_directories(outdir, bundle.files)
1872 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001873
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001874 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001875 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001876 if not os.path.isdir(cwd):
1877 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001878
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001879 # Multimap: digest -> list of pairs (path, props).
1880 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001881 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001882 if 'h' in props:
1883 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001884
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001885 # Now block on the remaining files to be downloaded and mapped.
1886 logging.info('Retrieving remaining files (%d of them)...',
1887 fetch_queue.pending_count)
1888 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001889 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001890 while remaining:
1891 detector.ping()
1892
1893 # Wait for any item to finish fetching to cache.
1894 digest = fetch_queue.wait(remaining)
1895
1896 # Link corresponding files to a fetched item in cache.
1897 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001898 cache.hardlink(
1899 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001900
1901 # Report progress.
1902 duration = time.time() - last_update
1903 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1904 msg = '%d files remaining...' % len(remaining)
1905 print msg
1906 logging.info(msg)
1907 last_update = time.time()
1908
1909 # Cache could evict some items we just tried to fetch, it's a fatal error.
1910 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001911 raise isolated_format.MappingError(
1912 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001913 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001914
1915
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001916def directory_to_metadata(root, algo, blacklist):
1917 """Returns the FileItem list and .isolated metadata for a directory."""
1918 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001919 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001920 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001921 metadata = {
1922 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001923 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001924 for relpath in paths
1925 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001926 for v in metadata.itervalues():
1927 v.pop('t')
1928 items = [
1929 FileItem(
1930 path=os.path.join(root, relpath),
1931 digest=meta['h'],
1932 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001933 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001934 for relpath, meta in metadata.iteritems() if 'h' in meta
1935 ]
1936 return items, metadata
1937
1938
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001939def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001940 """Stores every entries and returns the relevant data.
1941
1942 Arguments:
1943 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001944 files: list of file paths to upload. If a directory is specified, a
1945 .isolated file is created and its hash is returned.
1946 blacklist: function that returns True if a file should be omitted.
1947 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001948 assert all(isinstance(i, unicode) for i in files), files
1949 if len(files) != len(set(map(os.path.abspath, files))):
1950 raise Error('Duplicate entries found.')
1951
1952 results = []
1953 # The temporary directory is only created as needed.
1954 tempdir = None
1955 try:
1956 # TODO(maruel): Yield the files to a worker thread.
1957 items_to_upload = []
1958 for f in files:
1959 try:
1960 filepath = os.path.abspath(f)
1961 if os.path.isdir(filepath):
1962 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001963 items, metadata = directory_to_metadata(
1964 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001965
1966 # Create the .isolated file.
1967 if not tempdir:
1968 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1969 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1970 os.close(handle)
1971 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001972 'algo':
1973 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001974 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001975 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001976 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001977 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001978 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001979 items_to_upload.extend(items)
1980 items_to_upload.append(
1981 FileItem(
1982 path=isolated,
1983 digest=h,
1984 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001985 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001986 results.append((h, f))
1987
1988 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001989 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001990 items_to_upload.append(
1991 FileItem(
1992 path=filepath,
1993 digest=h,
1994 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001995 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001996 results.append((h, f))
1997 else:
1998 raise Error('%s is neither a file or directory.' % f)
1999 except OSError:
2000 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002001 # Technically we would care about which files were uploaded but we don't
2002 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002003 _uploaded_files = storage.upload_items(items_to_upload)
2004 return results
2005 finally:
2006 if tempdir:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002007 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002008
2009
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002010def archive(out, namespace, files, blacklist):
2011 if files == ['-']:
2012 files = sys.stdin.readlines()
2013
2014 if not files:
2015 raise Error('Nothing to upload')
2016
2017 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002018 blacklist = tools.gen_blacklist(blacklist)
2019 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002020 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002021 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2022
2023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002024@subcommand.usage('<file1..fileN> or - to read from stdin')
2025def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002026 """Archives data to the server.
2027
2028 If a directory is specified, a .isolated file is created the whole directory
2029 is uploaded. Then this .isolated file can be included in another one to run
2030 commands.
2031
2032 The commands output each file that was processed with its content hash. For
2033 directories, the .isolated generated for the directory is listed as the
2034 directory entry itself.
2035 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002036 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002037 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002038 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002039 process_isolate_server_options(parser, options)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002040 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002041 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002042 except Error as e:
2043 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002044 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002045
2046
2047def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002048 """Download data from the server.
2049
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002050 It can either download individual files or a complete tree from a .isolated
2051 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002052 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002053 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002054 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002055 '-i', '--isolated', metavar='HASH',
2056 help='hash of an isolated file, .isolated file content is discarded, use '
2057 '--file if you need it')
2058 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002059 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2060 help='hash and destination of a file, can be used multiple times')
2061 parser.add_option(
2062 '-t', '--target', metavar='DIR', default=os.getcwd(),
2063 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002064 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002065 options, args = parser.parse_args(args)
2066 if args:
2067 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002068
2069 process_isolate_server_options(parser, options)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002070 if bool(options.isolated) == bool(options.file):
2071 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002072
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002073 cache = process_cache_options(options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002074 options.target = os.path.abspath(options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002075 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002076 # Fetching individual files.
2077 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002078 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002079 channel = threading_utils.TaskChannel()
2080 pending = {}
2081 for digest, dest in options.file:
2082 pending[digest] = dest
2083 storage.async_fetch(
2084 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002085 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002086 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002087 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002088 functools.partial(file_write, os.path.join(options.target, dest)))
2089 while pending:
2090 fetched = channel.pull()
2091 dest = pending.pop(fetched)
2092 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002093
Vadim Shtayura3172be52013-12-03 12:49:05 -08002094 # Fetching whole isolated tree.
2095 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002096 with cache:
2097 bundle = fetch_isolated(
2098 isolated_hash=options.isolated,
2099 storage=storage,
2100 cache=cache,
2101 outdir=options.target,
2102 require_command=False)
2103 if bundle.command:
2104 rel = os.path.join(options.target, bundle.relative_cwd)
2105 print('To run this test please run from the directory %s:' %
2106 os.path.join(options.target, rel))
2107 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002108
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002109 return 0
2110
2111
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002112def add_archive_options(parser):
2113 parser.add_option(
2114 '--blacklist',
2115 action='append', default=list(DEFAULT_BLACKLIST),
2116 help='List of regexp to use as blacklist filter when uploading '
2117 'directories')
2118
2119
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002120def add_isolate_server_options(parser):
2121 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002122 parser.add_option(
2123 '-I', '--isolate-server',
2124 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002125 help='URL of the Isolate Server to use. Defaults to the environment '
2126 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2127 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002128 parser.add_option(
2129 '--namespace', default='default-gzip',
2130 help='The namespace to use on the Isolate Server, default: %default')
2131
2132
2133def process_isolate_server_options(parser, options):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002134 """Processes the --isolate-server option and aborts if not specified.
2135
2136 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002137 """
2138 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002139 parser.error('--isolate-server is required.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002140
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002141 parts = urlparse.urlparse(options.isolate_server, 'https')
2142 if parts.query:
2143 parser.error('--isolate-server doesn\'t support query parameter.')
2144 if parts.fragment:
2145 parser.error('--isolate-server doesn\'t support fragment in the url.')
2146 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2147 # what is desired here.
2148 new = list(parts)
2149 if not new[1] and new[2]:
2150 new[1] = new[2].rstrip('/')
2151 new[2] = ''
2152 new[2] = new[2].rstrip('/')
2153 options.isolate_server = urlparse.urlunparse(new)
2154 on_error.report_on_exception_exit(options.isolate_server)
2155 try:
2156 return auth.ensure_logged_in(options.isolate_server)
2157 except ValueError as e:
2158 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002159
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002160
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002161def add_cache_options(parser):
2162 cache_group = optparse.OptionGroup(parser, 'Cache management')
2163 cache_group.add_option(
2164 '--cache', metavar='DIR',
2165 help='Directory to keep a local cache of the files. Accelerates download '
2166 'by reusing already downloaded files. Default=%default')
2167 cache_group.add_option(
2168 '--max-cache-size',
2169 type='int',
2170 metavar='NNN',
2171 default=20*1024*1024*1024,
2172 help='Trim if the cache gets larger than this value, default=%default')
2173 cache_group.add_option(
2174 '--min-free-space',
2175 type='int',
2176 metavar='NNN',
2177 default=2*1024*1024*1024,
2178 help='Trim if disk free space becomes lower than this value, '
2179 'default=%default')
2180 cache_group.add_option(
2181 '--max-items',
2182 type='int',
2183 metavar='NNN',
2184 default=100000,
2185 help='Trim if more than this number of items are in the cache '
2186 'default=%default')
2187 parser.add_option_group(cache_group)
2188
2189
2190def process_cache_options(options):
2191 if options.cache:
2192 policies = CachePolicies(
2193 options.max_cache_size, options.min_free_space, options.max_items)
2194
2195 # |options.cache| path may not exist until DiskCache() instance is created.
2196 return DiskCache(
2197 os.path.abspath(options.cache),
2198 policies,
2199 isolated_format.get_hash_algo(options.namespace))
2200 else:
2201 return MemoryCache()
2202
2203
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002204class OptionParserIsolateServer(tools.OptionParserWithLogging):
2205 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002206 tools.OptionParserWithLogging.__init__(
2207 self,
2208 version=__version__,
2209 prog=os.path.basename(sys.modules[__name__].__file__),
2210 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002211 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002212
2213 def parse_args(self, *args, **kwargs):
2214 options, args = tools.OptionParserWithLogging.parse_args(
2215 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002216 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002217 return options, args
2218
2219
2220def main(args):
2221 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002222 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002223
2224
2225if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002226 fix_encoding.fix_encoding()
2227 tools.disable_buffering()
2228 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002229 sys.exit(main(sys.argv[1:]))