blob: c0b5c820fab2688c1898042cce0eff534129ad21 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05008__version__ = '0.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040012import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000014import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040015import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050017import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000018import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000020import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050021import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000022import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000024from third_party import colorama
25from third_party.depot_tools import fix_encoding
26from third_party.depot_tools import subcommand
27
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050028from utils import file_path
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040029from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000030from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040031from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000032from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000033from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000034
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040036import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080037
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000039# Version of isolate protocol passed to the server in /handshake request.
40ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000042
Vadim Shtayura3148e072014-09-02 18:51:52 -070043# The file size to be used when we don't know the correct file size,
44# generally used for .isolated files.
45UNKNOWN_FILE_SIZE = None
46
47
48# Maximum expected delay (in seconds) between successive file fetches or uploads
49# in Storage. If it takes longer than that, a deadlock might be happening
50# and all stack frames for all threads are dumped to log.
51DEADLOCK_TIMEOUT = 5 * 60
52
53
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000054# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000055# All files are sorted by likelihood of a change in the file content
56# (currently file size is used to estimate this: larger the file -> larger the
57# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# and so on. Numbers here is a trade-off; the more per request, the lower the
60# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
61# larger values cause longer lookups, increasing the initial latency to start
62# uploading, which is especially an issue for large files. This value is
63# optimized for the "few thousands files to look up with minimal number of large
64# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040065ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000066
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000067
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000068# A list of already compressed extension types that should not receive any
69# compression before being uploaded.
70ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040071 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
72 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000073]
74
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000075
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000076# Chunk size to use when reading from network stream.
77NET_IO_FILE_CHUNK = 16 * 1024
78
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000079
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000080# Read timeout in seconds for downloads from isolate storage. If there's no
81# response from the server within this timeout whole download will be aborted.
82DOWNLOAD_READ_TIMEOUT = 60
83
84
maruel@chromium.org41601642013-09-18 19:40:46 +000085# The delay (in seconds) to wait between logging statements when retrieving
86# the required files. This is intended to let the user (or buildbot) know that
87# the program is still running.
88DELAY_BETWEEN_UPDATES_IN_SECS = 30
89
90
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050091DEFAULT_BLACKLIST = (
92 # Temporary vim or python files.
93 r'^.+\.(?:pyc|swp)$',
94 # .git or .svn directory.
95 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
96)
97
98
Vadim Shtayura8623c272014-12-01 11:45:27 -080099# A class to use to communicate with the server by default. Can be changed by
100# 'set_storage_api_class'. Default is IsolateServer.
101_storage_api_cls = None
102
103
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500104class Error(Exception):
105 """Generic runtime error."""
106 pass
107
108
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400109class Aborted(Error):
110 """Operation aborted."""
111 pass
112
113
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000114def stream_read(stream, chunk_size):
115 """Reads chunks from |stream| and yields them."""
116 while True:
117 data = stream.read(chunk_size)
118 if not data:
119 break
120 yield data
121
122
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400123def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800124 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800126 if offset:
127 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000128 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000129 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000130 if not data:
131 break
132 yield data
133
134
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000135def file_write(filepath, content_generator):
136 """Writes file content as generated by content_generator.
137
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000138 Creates the intermediary directory as needed.
139
140 Returns the number of bytes written.
141
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000142 Meant to be mocked out in unit tests.
143 """
144 filedir = os.path.dirname(filepath)
145 if not os.path.isdir(filedir):
146 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000148 with open(filepath, 'wb') as f:
149 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000150 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000152 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153
154
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000155def zip_compress(content_generator, level=7):
156 """Reads chunks from |content_generator| and yields zip compressed chunks."""
157 compressor = zlib.compressobj(level)
158 for chunk in content_generator:
159 compressed = compressor.compress(chunk)
160 if compressed:
161 yield compressed
162 tail = compressor.flush(zlib.Z_FINISH)
163 if tail:
164 yield tail
165
166
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400167def zip_decompress(
168 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000169 """Reads zipped data from |content_generator| and yields decompressed data.
170
171 Decompresses data in small chunks (no larger than |chunk_size|) so that
172 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
173
174 Raises IOError if data is corrupted or incomplete.
175 """
176 decompressor = zlib.decompressobj()
177 compressed_size = 0
178 try:
179 for chunk in content_generator:
180 compressed_size += len(chunk)
181 data = decompressor.decompress(chunk, chunk_size)
182 if data:
183 yield data
184 while decompressor.unconsumed_tail:
185 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
186 if data:
187 yield data
188 tail = decompressor.flush()
189 if tail:
190 yield tail
191 except zlib.error as e:
192 raise IOError(
193 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
194 # Ensure all data was read and decompressed.
195 if decompressor.unused_data or decompressor.unconsumed_tail:
196 raise IOError('Not all data was decompressed')
197
198
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000199def get_zip_compression_level(filename):
200 """Given a filename calculates the ideal zip compression level to use."""
201 file_ext = os.path.splitext(filename)[1].lower()
202 # TODO(csharp): Profile to find what compression level works best.
203 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
204
205
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000206def create_directories(base_directory, files):
207 """Creates the directory structure needed by the given list of files."""
208 logging.debug('create_directories(%s, %d)', base_directory, len(files))
209 # Creates the tree of directories to create.
210 directories = set(os.path.dirname(f) for f in files)
211 for item in list(directories):
212 while item:
213 directories.add(item)
214 item = os.path.dirname(item)
215 for d in sorted(directories):
216 if d:
217 os.mkdir(os.path.join(base_directory, d))
218
219
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500220def create_symlinks(base_directory, files):
221 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000222 for filepath, properties in files:
223 if 'l' not in properties:
224 continue
225 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500226 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000227 logging.warning('Ignoring symlink %s', filepath)
228 continue
229 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500230 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000231 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000232
233
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000234def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000235 """Determines if the given files appears valid.
236
237 Currently it just checks the file's size.
238 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700239 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000240 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000241 actual_size = os.stat(filepath).st_size
242 if size != actual_size:
243 logging.warning(
244 'Found invalid item %s; %d != %d',
245 os.path.basename(filepath), actual_size, size)
246 return False
247 return True
248
249
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000250class Item(object):
251 """An item to push to Storage.
252
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800253 Its digest and size may be provided in advance, if known. Otherwise they will
254 be derived from content(). If digest is provided, it MUST correspond to
255 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000256
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800257 When used with Storage, Item starts its life in a main thread, travels
258 to 'contains' thread, then to 'push' thread and then finally back to
259 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000260 """
261
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800262 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000263 self.digest = digest
264 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800265 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000266 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800268 def content(self):
269 """Iterable with content of this item as byte string (str) chunks."""
270 raise NotImplementedError()
271
272 def prepare(self, hash_algo):
273 """Ensures self.digest and self.size are set.
274
275 Uses content() as a source of data to calculate them. Does nothing if digest
276 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000277
278 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800279 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000280 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800281 if self.digest is None or self.size is None:
282 digest = hash_algo()
283 total = 0
284 for chunk in self.content():
285 digest.update(chunk)
286 total += len(chunk)
287 self.digest = digest.hexdigest()
288 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000289
290
291class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800292 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000293
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800294 Its digest and size may be provided in advance, if known. Otherwise they will
295 be derived from the file content.
296 """
297
298 def __init__(self, path, digest=None, size=None, high_priority=False):
299 super(FileItem, self).__init__(
300 digest,
301 size if size is not None else os.stat(path).st_size,
302 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000303 self.path = path
304 self.compression_level = get_zip_compression_level(path)
305
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800306 def content(self):
307 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000308
309
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000310class BufferItem(Item):
311 """A byte buffer to push to Storage."""
312
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800313 def __init__(self, buf, high_priority=False):
314 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000315 self.buffer = buf
316
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000318 return [self.buffer]
319
320
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000321class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800322 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000323
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 Implements compression support, parallel 'contains' checks, parallel uploads
325 and more.
326
327 Works only within single namespace (and thus hashing algorithm and compression
328 scheme are fixed).
329
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400330 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
331 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800332 """
333
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700334 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000335 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400336 self._use_zip = isolated_format.is_namespace_with_compression(
337 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400338 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000339 self._cpu_thread_pool = None
340 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400341 self._aborted = False
342 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000343
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000344 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700345 def hash_algo(self):
346 """Hashing algorithm used to name files in storage based on their content.
347
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400348 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700349 """
350 return self._hash_algo
351
352 @property
353 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500354 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700355 return self._storage_api.location
356
357 @property
358 def namespace(self):
359 """Isolate namespace used by this storage.
360
361 Indirectly defines hashing scheme and compression method used.
362 """
363 return self._storage_api.namespace
364
365 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000366 def cpu_thread_pool(self):
367 """ThreadPool for CPU-bound tasks like zipping."""
368 if self._cpu_thread_pool is None:
369 self._cpu_thread_pool = threading_utils.ThreadPool(
370 2, max(threading_utils.num_processors(), 2), 0, 'zip')
371 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000372
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000373 @property
374 def net_thread_pool(self):
375 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
376 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700377 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000378 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000379
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000380 def close(self):
381 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400382 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000383 if self._cpu_thread_pool:
384 self._cpu_thread_pool.join()
385 self._cpu_thread_pool.close()
386 self._cpu_thread_pool = None
387 if self._net_thread_pool:
388 self._net_thread_pool.join()
389 self._net_thread_pool.close()
390 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400391 logging.info('Done.')
392
393 def abort(self):
394 """Cancels any pending or future operations."""
395 # This is not strictly theadsafe, but in the worst case the logging message
396 # will be printed twice. Not a big deal. In other places it is assumed that
397 # unprotected reads and writes to _aborted are serializable (it is true
398 # for python) and thus no locking is used.
399 if not self._aborted:
400 logging.warning('Aborting... It can take a while.')
401 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000402
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000403 def __enter__(self):
404 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400405 assert not self._prev_sig_handlers, self._prev_sig_handlers
406 for s in (signal.SIGINT, signal.SIGTERM):
407 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000408 return self
409
410 def __exit__(self, _exc_type, _exc_value, _traceback):
411 """Context manager interface."""
412 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400413 while self._prev_sig_handlers:
414 s, h = self._prev_sig_handlers.popitem()
415 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000416 return False
417
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000418 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800419 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000420
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800421 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000422
423 Arguments:
424 items: list of Item instances that represents data to upload.
425
426 Returns:
427 List of items that were uploaded. All other items are already there.
428 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700429 logging.info('upload_items(items=%d)', len(items))
430
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800431 # Ensure all digests are calculated.
432 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700433 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800434
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000435 # For each digest keep only first Item that matches it. All other items
436 # are just indistinguishable copies from the point of view of isolate
437 # server (it doesn't care about paths at all, only content and digests).
438 seen = {}
439 duplicates = 0
440 for item in items:
441 if seen.setdefault(item.digest, item) is not item:
442 duplicates += 1
443 items = seen.values()
444 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700445 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000446
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000447 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000448 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000449 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800450 channel = threading_utils.TaskChannel()
451 for missing_item, push_state in self.get_missing_items(items):
452 missing.add(missing_item)
453 self.async_push(channel, missing_item, push_state)
454
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000455 # No need to spawn deadlock detector thread if there's nothing to upload.
456 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700457 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000458 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000459 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000460 detector.ping()
461 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000462 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000463 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000464 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 logging.info('All files are uploaded')
466
467 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000468 total = len(items)
469 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000470 logging.info(
471 'Total: %6d, %9.1fkb',
472 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000473 total_size / 1024.)
474 cache_hit = set(items) - missing
475 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000476 logging.info(
477 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
478 len(cache_hit),
479 cache_hit_size / 1024.,
480 len(cache_hit) * 100. / total,
481 cache_hit_size * 100. / total_size if total_size else 0)
482 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000483 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000484 logging.info(
485 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
486 len(cache_miss),
487 cache_miss_size / 1024.,
488 len(cache_miss) * 100. / total,
489 cache_miss_size * 100. / total_size if total_size else 0)
490
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000491 return uploaded
492
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800493 def get_fetch_url(self, item):
494 """Returns an URL that can be used to fetch given item once it's uploaded.
495
496 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497
498 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800499 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000500
501 Returns:
502 An URL or None if underlying protocol doesn't support this.
503 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700504 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800505 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000506
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800507 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000508 """Starts asynchronous push to the server in a parallel thread.
509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 Can be used only after |item| was checked for presence on a server with
511 'get_missing_items' call. 'get_missing_items' returns |push_state| object
512 that contains storage specific information describing how to upload
513 the item (for example in case of cloud storage, it is signed upload URLs).
514
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000515 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000516 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000517 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800518 push_state: push state returned by 'get_missing_items' call for |item|.
519
520 Returns:
521 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000522 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800523 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400524 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700525 threading_utils.PRIORITY_HIGH if item.high_priority
526 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800527
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000528 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400529 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400530 if self._aborted:
531 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700532 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800533 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 return item
535
536 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700537 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800538 self.net_thread_pool.add_task_with_channel(
539 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540 return
541
542 # If zipping is enabled, zip in a separate thread.
543 def zip_and_push():
544 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
545 # content right here. It will block until all file is zipped.
546 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400547 if self._aborted:
548 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800549 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000550 data = ''.join(stream)
551 except Exception as exc:
552 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800553 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000555 self.net_thread_pool.add_task_with_channel(
556 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000557 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000558
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800559 def push(self, item, push_state):
560 """Synchronously pushes a single item to the server.
561
562 If you need to push many items at once, consider using 'upload_items' or
563 'async_push' with instance of TaskChannel.
564
565 Arguments:
566 item: item to upload as instance of Item class.
567 push_state: push state returned by 'get_missing_items' call for |item|.
568
569 Returns:
570 Pushed item (same object as |item|).
571 """
572 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700573 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800574 self.async_push(channel, item, push_state)
575 pushed = channel.pull()
576 assert pushed is item
577 return item
578
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000579 def async_fetch(self, channel, priority, digest, size, sink):
580 """Starts asynchronous fetch from the server in a parallel thread.
581
582 Arguments:
583 channel: TaskChannel that receives back |digest| when download ends.
584 priority: thread pool task priority for the fetch.
585 digest: hex digest of an item to download.
586 size: expected size of the item (after decompression).
587 sink: function that will be called as sink(generator).
588 """
589 def fetch():
590 try:
591 # Prepare reading pipeline.
592 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700593 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400594 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000595 # Run |stream| through verifier that will assert its size.
596 verifier = FetchStreamVerifier(stream, size)
597 # Verified stream goes to |sink|.
598 sink(verifier.run())
599 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800600 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000601 raise
602 return digest
603
604 # Don't bother with zip_thread_pool for decompression. Decompression is
605 # really fast and most probably IO bound anyway.
606 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
607
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000608 def get_missing_items(self, items):
609 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000610
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000611 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000612
613 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000614 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000615
616 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800617 For each missing item it yields a pair (item, push_state), where:
618 * item - Item object that is missing (one of |items|).
619 * push_state - opaque object that contains storage specific information
620 describing how to upload the item (for example in case of cloud
621 storage, it is signed upload URLs). It can later be passed to
622 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000624 channel = threading_utils.TaskChannel()
625 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800626
627 # Ensure all digests are calculated.
628 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700629 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800630
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400631 def contains(batch):
632 if self._aborted:
633 raise Aborted()
634 return self._storage_api.contains(batch)
635
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000636 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800637 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400638 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400639 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000640 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800641
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000642 # Yield results as they come in.
643 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800644 for missing_item, push_state in channel.pull().iteritems():
645 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000646
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000647
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800648def batch_items_for_check(items):
649 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000650
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800651 Each batch corresponds to a single 'exists?' query to the server via a call
652 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000653
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800654 Arguments:
655 items: a list of Item objects.
656
657 Yields:
658 Batches of items to query for existence in a single operation,
659 each batch is a list of Item objects.
660 """
661 batch_count = 0
662 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
663 next_queries = []
664 for item in sorted(items, key=lambda x: x.size, reverse=True):
665 next_queries.append(item)
666 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000667 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800668 next_queries = []
669 batch_count += 1
670 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
671 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
672 if next_queries:
673 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000674
675
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000676class FetchQueue(object):
677 """Fetches items from Storage and places them into LocalCache.
678
679 It manages multiple concurrent fetch operations. Acts as a bridge between
680 Storage and LocalCache so that Storage and LocalCache don't depend on each
681 other at all.
682 """
683
684 def __init__(self, storage, cache):
685 self.storage = storage
686 self.cache = cache
687 self._channel = threading_utils.TaskChannel()
688 self._pending = set()
689 self._accessed = set()
690 self._fetched = cache.cached_set()
691
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400692 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700693 self,
694 digest,
695 size=UNKNOWN_FILE_SIZE,
696 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000697 """Starts asynchronous fetch of item |digest|."""
698 # Fetching it now?
699 if digest in self._pending:
700 return
701
702 # Mark this file as in use, verify_all_cached will later ensure it is still
703 # in cache.
704 self._accessed.add(digest)
705
706 # Already fetched? Notify cache to update item's LRU position.
707 if digest in self._fetched:
708 # 'touch' returns True if item is in cache and not corrupted.
709 if self.cache.touch(digest, size):
710 return
711 # Item is corrupted, remove it from cache and fetch it again.
712 self._fetched.remove(digest)
713 self.cache.evict(digest)
714
715 # TODO(maruel): It should look at the free disk space, the current cache
716 # size and the size of the new item on every new item:
717 # - Trim the cache as more entries are listed when free disk space is low,
718 # otherwise if the amount of data downloaded during the run > free disk
719 # space, it'll crash.
720 # - Make sure there's enough free disk space to fit all dependencies of
721 # this run! If not, abort early.
722
723 # Start fetching.
724 self._pending.add(digest)
725 self.storage.async_fetch(
726 self._channel, priority, digest, size,
727 functools.partial(self.cache.write, digest))
728
729 def wait(self, digests):
730 """Starts a loop that waits for at least one of |digests| to be retrieved.
731
732 Returns the first digest retrieved.
733 """
734 # Flush any already fetched items.
735 for digest in digests:
736 if digest in self._fetched:
737 return digest
738
739 # Ensure all requested items are being fetched now.
740 assert all(digest in self._pending for digest in digests), (
741 digests, self._pending)
742
743 # Wait for some requested item to finish fetching.
744 while self._pending:
745 digest = self._channel.pull()
746 self._pending.remove(digest)
747 self._fetched.add(digest)
748 if digest in digests:
749 return digest
750
751 # Should never reach this point due to assert above.
752 raise RuntimeError('Impossible state')
753
754 def inject_local_file(self, path, algo):
755 """Adds local file to the cache as if it was fetched from storage."""
756 with open(path, 'rb') as f:
757 data = f.read()
758 digest = algo(data).hexdigest()
759 self.cache.write(digest, [data])
760 self._fetched.add(digest)
761 return digest
762
763 @property
764 def pending_count(self):
765 """Returns number of items to be fetched."""
766 return len(self._pending)
767
768 def verify_all_cached(self):
769 """True if all accessed items are in cache."""
770 return self._accessed.issubset(self.cache.cached_set())
771
772
773class FetchStreamVerifier(object):
774 """Verifies that fetched file is valid before passing it to the LocalCache."""
775
776 def __init__(self, stream, expected_size):
777 self.stream = stream
778 self.expected_size = expected_size
779 self.current_size = 0
780
781 def run(self):
782 """Generator that yields same items as |stream|.
783
784 Verifies |stream| is complete before yielding a last chunk to consumer.
785
786 Also wraps IOError produced by consumer into MappingError exceptions since
787 otherwise Storage will retry fetch on unrelated local cache errors.
788 """
789 # Read one chunk ahead, keep it in |stored|.
790 # That way a complete stream can be verified before pushing last chunk
791 # to consumer.
792 stored = None
793 for chunk in self.stream:
794 assert chunk is not None
795 if stored is not None:
796 self._inspect_chunk(stored, is_last=False)
797 try:
798 yield stored
799 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400800 raise isolated_format.MappingError(
801 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000802 stored = chunk
803 if stored is not None:
804 self._inspect_chunk(stored, is_last=True)
805 try:
806 yield stored
807 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400808 raise isolated_format.MappingError(
809 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000810
811 def _inspect_chunk(self, chunk, is_last):
812 """Called for each fetched chunk before passing it to consumer."""
813 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400814 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700815 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000816 (self.expected_size != self.current_size)):
817 raise IOError('Incorrect file size: expected %d, got %d' % (
818 self.expected_size, self.current_size))
819
820
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000821class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800822 """Interface for classes that implement low-level storage operations.
823
824 StorageApi is oblivious of compression and hashing scheme used. This details
825 are handled in higher level Storage class.
826
827 Clients should generally not use StorageApi directly. Storage class is
828 preferred since it implements compression and upload optimizations.
829 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000830
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700831 @property
832 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500833 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700834 raise NotImplementedError()
835
836 @property
837 def namespace(self):
838 """Isolate namespace used by this storage.
839
840 Indirectly defines hashing scheme and compression method used.
841 """
842 raise NotImplementedError()
843
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000844 def get_fetch_url(self, digest):
845 """Returns an URL that can be used to fetch an item with given digest.
846
847 Arguments:
848 digest: hex digest of item to fetch.
849
850 Returns:
851 An URL or None if the protocol doesn't support this.
852 """
853 raise NotImplementedError()
854
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800855 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000856 """Fetches an object and yields its content.
857
858 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000859 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800860 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000861
862 Yields:
863 Chunks of downloaded item (as str objects).
864 """
865 raise NotImplementedError()
866
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800867 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000868 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000869
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800870 |item| MUST go through 'contains' call to get |push_state| before it can
871 be pushed to the storage.
872
873 To be clear, here is one possible usage:
874 all_items = [... all items to push as Item subclasses ...]
875 for missing_item, push_state in storage_api.contains(all_items).items():
876 storage_api.push(missing_item, push_state)
877
878 When pushing to a namespace with compression, data that should be pushed
879 and data provided by the item is not the same. In that case |content| is
880 not None and it yields chunks of compressed data (using item.content() as
881 a source of original uncompressed data). This is implemented by Storage
882 class.
883
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000884 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000885 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800886 push_state: push state object as returned by 'contains' call.
887 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000888
889 Returns:
890 None.
891 """
892 raise NotImplementedError()
893
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000894 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800895 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000896
897 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800898 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000899
900 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800901 A dict missing Item -> opaque push state object to be passed to 'push'.
902 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000903 """
904 raise NotImplementedError()
905
906
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800907class _IsolateServerPushState(object):
908 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500909
910 Note this needs to be a global class to support pickling.
911 """
912
913 def __init__(self, upload_url, finalize_url):
914 self.upload_url = upload_url
915 self.finalize_url = finalize_url
916 self.uploaded = False
917 self.finalized = False
918
919
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000920class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000921 """StorageApi implementation that downloads and uploads to Isolate Server.
922
923 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800924 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000925 """
926
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000927 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000928 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500929 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700930 self._base_url = base_url.rstrip('/')
931 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000932 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000933 self._server_caps = None
934
935 @staticmethod
936 def _generate_handshake_request():
937 """Returns a dict to be sent as handshake request body."""
938 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
939 return {
940 'client_app_version': __version__,
941 'fetcher': True,
942 'protocol_version': ISOLATE_PROTOCOL_VERSION,
943 'pusher': True,
944 }
945
946 @staticmethod
947 def _validate_handshake_response(caps):
948 """Validates and normalizes handshake response."""
949 logging.info('Protocol version: %s', caps['protocol_version'])
950 logging.info('Server version: %s', caps['server_app_version'])
951 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400952 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000953 if not caps['access_token']:
954 raise ValueError('access_token is missing')
955 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000956
957 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000958 def _server_capabilities(self):
959 """Performs handshake with the server if not yet done.
960
961 Returns:
962 Server capabilities dictionary as returned by /handshake endpoint.
963
964 Raises:
965 MappingError if server rejects the handshake.
966 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000967 # TODO(maruel): Make this request much earlier asynchronously while the
968 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800969
970 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
971 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000972 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000973 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000974 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400975 caps = net.url_read_json(
976 url=self._base_url + '/content-gs/handshake',
977 data=self._generate_handshake_request())
978 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400979 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000980 if not isinstance(caps, dict):
981 raise ValueError('Expecting JSON dict')
982 self._server_caps = self._validate_handshake_response(caps)
983 except (ValueError, KeyError, TypeError) as exc:
984 # KeyError exception has very confusing str conversion: it's just a
985 # missing key value and nothing else. So print exception class name
986 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400987 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400988 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000989 exc.__class__.__name__, exc))
990 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000991
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700992 @property
993 def location(self):
994 return self._base_url
995
996 @property
997 def namespace(self):
998 return self._namespace
999
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001000 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001001 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001002 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001003 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001004
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001005 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001006 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001007 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001008
Vadim Shtayura8623c272014-12-01 11:45:27 -08001009 connection = self.do_fetch(source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001010 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001011 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001012
1013 # If |offset| is used, verify server respects it by checking Content-Range.
1014 if offset:
1015 content_range = connection.get_header('Content-Range')
1016 if not content_range:
1017 raise IOError('Missing Content-Range header')
1018
1019 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1020 # According to a spec, <size> can be '*' meaning "Total size of the file
1021 # is not known in advance".
1022 try:
1023 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1024 if not match:
1025 raise ValueError()
1026 content_offset = int(match.group(1))
1027 last_byte_index = int(match.group(2))
1028 size = None if match.group(3) == '*' else int(match.group(3))
1029 except ValueError:
1030 raise IOError('Invalid Content-Range header: %s' % content_range)
1031
1032 # Ensure returned offset equals requested one.
1033 if offset != content_offset:
1034 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1035 offset, content_offset, content_range))
1036
1037 # Ensure entire tail of the file is returned.
1038 if size is not None and last_byte_index + 1 != size:
1039 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1040
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001041 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001042
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001043 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001044 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001045 assert item.digest is not None
1046 assert item.size is not None
1047 assert isinstance(push_state, _IsolateServerPushState)
1048 assert not push_state.finalized
1049
1050 # Default to item.content().
1051 content = item.content() if content is None else content
1052
1053 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1054 if isinstance(content, basestring):
1055 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1056 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001057
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001058 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1059 # If |content| is indeed a generator, it can not be re-winded back
1060 # to the beginning of the stream. A retry will find it exhausted. A possible
1061 # solution is to wrap |content| generator with some sort of caching
1062 # restartable generator. It should be done alongside streaming support
1063 # implementation.
1064
1065 # This push operation may be a retry after failed finalization call below,
1066 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001067 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001068 # PUT file to |upload_url|.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001069 success = self.do_push(push_state.upload_url, content)
1070 if not success:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001071 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001072 item.digest, push_state.upload_url))
1073 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001074 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001075 logging.info(
1076 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001077
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001078 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001079 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001080 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1081 # send it to isolated server. That way isolate server can verify that
1082 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1083 # stored files).
Vadim Shtayura8623c272014-12-01 11:45:27 -08001084 # TODO(maruel): Fix the server to accept properly data={} so
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001085 # url_read_json() can be used.
1086 response = net.url_read(
1087 url=push_state.finalize_url,
1088 data='',
1089 content_type='application/json',
1090 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001091 if response is None:
1092 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001093 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001094
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001095 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001096 # Ensure all items were initialized with 'prepare' call. Storage does that.
1097 assert all(i.digest is not None and i.size is not None for i in items)
1098
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001099 # Request body is a json encoded list of dicts.
1100 body = [
1101 {
1102 'h': item.digest,
1103 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001104 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001105 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001106 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001107
1108 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001109 self._base_url,
1110 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001111 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001112
1113 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001114 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001115 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001116 response = net.url_read_json(url=query_url, data=body)
1117 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001118 raise isolated_format.MappingError(
1119 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001120 if not isinstance(response, list):
1121 raise ValueError('Expecting response with json-encoded list')
1122 if len(response) != len(items):
1123 raise ValueError(
1124 'Incorrect number of items in the list, expected %d, '
1125 'but got %d' % (len(items), len(response)))
1126 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001127 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001128 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001129
1130 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001131 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001132 for i, push_urls in enumerate(response):
1133 if push_urls:
1134 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001135 missing_items[items[i]] = _IsolateServerPushState(
1136 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001137 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001138 len(items), len(items) - len(missing_items))
1139 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001140
Vadim Shtayura8623c272014-12-01 11:45:27 -08001141 def do_fetch(self, url, offset):
1142 """Fetches isolated data from the URL.
1143
1144 Used only for fetching files, not for API calls. Can be overridden in
1145 subclasses.
1146
1147 Args:
1148 url: URL to fetch the data from, can possibly return http redirect.
1149 offset: byte offset inside the file to start fetching from.
1150
1151 Returns:
1152 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1153 """
1154 return net.url_open(
1155 url,
1156 read_timeout=DOWNLOAD_READ_TIMEOUT,
1157 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1158
1159 def do_push(self, url, content):
1160 """Uploads isolated file to the URL.
1161
1162 Used only for storing files, not for API calls. Can be overridden in
1163 subclasses.
1164
1165 Args:
1166 url: URL to upload the data to.
1167 content: an iterable that yields 'str' chunks.
1168
1169 Returns:
1170 True on success, False on failure.
1171 """
1172 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1173 # upload support is implemented.
1174 if isinstance(content, list) and len(content) == 1:
1175 content = content[0]
1176 else:
1177 content = ''.join(content)
1178 response = net.url_read(
1179 url=url,
1180 data=content,
1181 content_type='application/octet-stream',
1182 method='PUT')
1183 return response is not None
1184
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001185
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001186class LocalCache(object):
1187 """Local cache that stores objects fetched via Storage.
1188
1189 It can be accessed concurrently from multiple threads, so it should protect
1190 its internal state with some lock.
1191 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001192 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001193
1194 def __enter__(self):
1195 """Context manager interface."""
1196 return self
1197
1198 def __exit__(self, _exc_type, _exec_value, _traceback):
1199 """Context manager interface."""
1200 return False
1201
1202 def cached_set(self):
1203 """Returns a set of all cached digests (always a new object)."""
1204 raise NotImplementedError()
1205
1206 def touch(self, digest, size):
1207 """Ensures item is not corrupted and updates its LRU position.
1208
1209 Arguments:
1210 digest: hash digest of item to check.
1211 size: expected size of this item.
1212
1213 Returns:
1214 True if item is in cache and not corrupted.
1215 """
1216 raise NotImplementedError()
1217
1218 def evict(self, digest):
1219 """Removes item from cache if it's there."""
1220 raise NotImplementedError()
1221
1222 def read(self, digest):
1223 """Returns contents of the cached item as a single str."""
1224 raise NotImplementedError()
1225
1226 def write(self, digest, content):
1227 """Reads data from |content| generator and stores it in cache."""
1228 raise NotImplementedError()
1229
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001230 def hardlink(self, digest, dest, file_mode):
1231 """Ensures file at |dest| has same content as cached |digest|.
1232
1233 If file_mode is provided, it is used to set the executable bit if
1234 applicable.
1235 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001236 raise NotImplementedError()
1237
1238
1239class MemoryCache(LocalCache):
1240 """LocalCache implementation that stores everything in memory."""
1241
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001242 def __init__(self, file_mode_mask=0500):
1243 """Args:
1244 file_mode_mask: bit mask to AND file mode with. Default value will make
1245 all mapped files to be read only.
1246 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001247 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001248 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001249 # Let's not assume dict is thread safe.
1250 self._lock = threading.Lock()
1251 self._contents = {}
1252
1253 def cached_set(self):
1254 with self._lock:
1255 return set(self._contents)
1256
1257 def touch(self, digest, size):
1258 with self._lock:
1259 return digest in self._contents
1260
1261 def evict(self, digest):
1262 with self._lock:
1263 self._contents.pop(digest, None)
1264
1265 def read(self, digest):
1266 with self._lock:
1267 return self._contents[digest]
1268
1269 def write(self, digest, content):
1270 # Assemble whole stream before taking the lock.
1271 data = ''.join(content)
1272 with self._lock:
1273 self._contents[digest] = data
1274
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001275 def hardlink(self, digest, dest, file_mode):
1276 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001277 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001278 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001279 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001280
1281
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001282class CachePolicies(object):
1283 def __init__(self, max_cache_size, min_free_space, max_items):
1284 """
1285 Arguments:
1286 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1287 cache is effectively a leak.
1288 - min_free_space: Trim if disk free space becomes lower than this value. If
1289 0, it unconditionally fill the disk.
1290 - max_items: Maximum number of items to keep in the cache. If 0, do not
1291 enforce a limit.
1292 """
1293 self.max_cache_size = max_cache_size
1294 self.min_free_space = min_free_space
1295 self.max_items = max_items
1296
1297
1298class DiskCache(LocalCache):
1299 """Stateful LRU cache in a flat hash table in a directory.
1300
1301 Saves its state as json file.
1302 """
1303 STATE_FILE = 'state.json'
1304
1305 def __init__(self, cache_dir, policies, hash_algo):
1306 """
1307 Arguments:
1308 cache_dir: directory where to place the cache.
1309 policies: cache retention policies.
1310 algo: hashing algorithm used.
1311 """
1312 super(DiskCache, self).__init__()
1313 self.cache_dir = cache_dir
1314 self.policies = policies
1315 self.hash_algo = hash_algo
1316 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1317
1318 # All protected methods (starting with '_') except _path should be called
1319 # with this lock locked.
1320 self._lock = threading_utils.LockWithAssert()
1321 self._lru = lru.LRUDict()
1322
1323 # Profiling values.
1324 self._added = []
1325 self._removed = []
1326 self._free_disk = 0
1327
1328 with tools.Profiler('Setup'):
1329 with self._lock:
1330 self._load()
1331
1332 def __enter__(self):
1333 return self
1334
1335 def __exit__(self, _exc_type, _exec_value, _traceback):
1336 with tools.Profiler('CleanupTrimming'):
1337 with self._lock:
1338 self._trim()
1339
1340 logging.info(
1341 '%5d (%8dkb) added',
1342 len(self._added), sum(self._added) / 1024)
1343 logging.info(
1344 '%5d (%8dkb) current',
1345 len(self._lru),
1346 sum(self._lru.itervalues()) / 1024)
1347 logging.info(
1348 '%5d (%8dkb) removed',
1349 len(self._removed), sum(self._removed) / 1024)
1350 logging.info(
1351 ' %8dkb free',
1352 self._free_disk / 1024)
1353 return False
1354
1355 def cached_set(self):
1356 with self._lock:
1357 return self._lru.keys_set()
1358
1359 def touch(self, digest, size):
1360 """Verifies an actual file is valid.
1361
1362 Note that is doesn't compute the hash so it could still be corrupted if the
1363 file size didn't change.
1364
1365 TODO(maruel): More stringent verification while keeping the check fast.
1366 """
1367 # Do the check outside the lock.
1368 if not is_valid_file(self._path(digest), size):
1369 return False
1370
1371 # Update it's LRU position.
1372 with self._lock:
1373 if digest not in self._lru:
1374 return False
1375 self._lru.touch(digest)
1376 return True
1377
1378 def evict(self, digest):
1379 with self._lock:
1380 self._lru.pop(digest)
1381 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1382
1383 def read(self, digest):
1384 with open(self._path(digest), 'rb') as f:
1385 return f.read()
1386
1387 def write(self, digest, content):
1388 path = self._path(digest)
1389 # A stale broken file may remain. It is possible for the file to have write
1390 # access bit removed which would cause the file_write() call to fail to open
1391 # in write mode. Take no chance here.
1392 file_path.try_remove(path)
1393 try:
1394 size = file_write(path, content)
1395 except:
1396 # There are two possible places were an exception can occur:
1397 # 1) Inside |content| generator in case of network or unzipping errors.
1398 # 2) Inside file_write itself in case of disk IO errors.
1399 # In any case delete an incomplete file and propagate the exception to
1400 # caller, it will be logged there.
1401 file_path.try_remove(path)
1402 raise
1403 # Make the file read-only in the cache. This has a few side-effects since
1404 # the file node is modified, so every directory entries to this file becomes
1405 # read-only. It's fine here because it is a new file.
1406 file_path.set_read_only(path, True)
1407 with self._lock:
1408 self._add(digest, size)
1409
1410 def hardlink(self, digest, dest, file_mode):
1411 """Hardlinks the file to |dest|.
1412
1413 Note that the file permission bits are on the file node, not the directory
1414 entry, so changing the access bit on any of the directory entries for the
1415 file node will affect them all.
1416 """
1417 path = self._path(digest)
1418 # TODO(maruel): file_path.HARDLINK_WITH_FALLBACK ?
1419 file_path.hardlink(path, dest)
1420 if file_mode is not None:
1421 # Ignores all other bits.
1422 os.chmod(dest, file_mode & 0500)
1423
1424 def _load(self):
1425 """Loads state of the cache from json file."""
1426 self._lock.assert_locked()
1427
1428 if not os.path.isdir(self.cache_dir):
1429 os.makedirs(self.cache_dir)
1430 else:
1431 # Make sure the cache is read-only.
1432 # TODO(maruel): Calculate the cost and optimize the performance
1433 # accordingly.
1434 file_path.make_tree_read_only(self.cache_dir)
1435
1436 # Load state of the cache.
1437 if os.path.isfile(self.state_file):
1438 try:
1439 self._lru = lru.LRUDict.load(self.state_file)
1440 except ValueError as err:
1441 logging.error('Failed to load cache state: %s' % (err,))
1442 # Don't want to keep broken state file.
1443 file_path.try_remove(self.state_file)
1444
1445 # Ensure that all files listed in the state still exist and add new ones.
1446 previous = self._lru.keys_set()
1447 unknown = []
1448 for filename in os.listdir(self.cache_dir):
1449 if filename == self.STATE_FILE:
1450 continue
1451 if filename in previous:
1452 previous.remove(filename)
1453 continue
1454 # An untracked file.
1455 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1456 logging.warning('Removing unknown file %s from cache', filename)
1457 file_path.try_remove(self._path(filename))
1458 continue
1459 # File that's not referenced in 'state.json'.
1460 # TODO(vadimsh): Verify its SHA1 matches file name.
1461 logging.warning('Adding unknown file %s to cache', filename)
1462 unknown.append(filename)
1463
1464 if unknown:
1465 # Add as oldest files. They will be deleted eventually if not accessed.
1466 self._add_oldest_list(unknown)
1467 logging.warning('Added back %d unknown files', len(unknown))
1468
1469 if previous:
1470 # Filter out entries that were not found.
1471 logging.warning('Removed %d lost files', len(previous))
1472 for filename in previous:
1473 self._lru.pop(filename)
1474 self._trim()
1475
1476 def _save(self):
1477 """Saves the LRU ordering."""
1478 self._lock.assert_locked()
1479 if sys.platform != 'win32':
1480 d = os.path.dirname(self.state_file)
1481 if os.path.isdir(d):
1482 # Necessary otherwise the file can't be created.
1483 file_path.set_read_only(d, False)
1484 if os.path.isfile(self.state_file):
1485 file_path.set_read_only(self.state_file, False)
1486 self._lru.save(self.state_file)
1487
1488 def _trim(self):
1489 """Trims anything we don't know, make sure enough free space exists."""
1490 self._lock.assert_locked()
1491
1492 # Ensure maximum cache size.
1493 if self.policies.max_cache_size:
1494 total_size = sum(self._lru.itervalues())
1495 while total_size > self.policies.max_cache_size:
1496 total_size -= self._remove_lru_file()
1497
1498 # Ensure maximum number of items in the cache.
1499 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1500 for _ in xrange(len(self._lru) - self.policies.max_items):
1501 self._remove_lru_file()
1502
1503 # Ensure enough free space.
1504 self._free_disk = file_path.get_free_space(self.cache_dir)
1505 trimmed_due_to_space = False
1506 while (
1507 self.policies.min_free_space and
1508 self._lru and
1509 self._free_disk < self.policies.min_free_space):
1510 trimmed_due_to_space = True
1511 self._remove_lru_file()
1512 self._free_disk = file_path.get_free_space(self.cache_dir)
1513 if trimmed_due_to_space:
1514 total_usage = sum(self._lru.itervalues())
1515 usage_percent = 0.
1516 if total_usage:
1517 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1518 logging.warning(
1519 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1520 'cache (%.1f%% of its maximum capacity)',
1521 self._free_disk / 1024.,
1522 total_usage / 1024.,
1523 usage_percent)
1524 self._save()
1525
1526 def _path(self, digest):
1527 """Returns the path to one item."""
1528 return os.path.join(self.cache_dir, digest)
1529
1530 def _remove_lru_file(self):
1531 """Removes the last recently used file and returns its size."""
1532 self._lock.assert_locked()
1533 digest, size = self._lru.pop_oldest()
1534 self._delete_file(digest, size)
1535 return size
1536
1537 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1538 """Adds an item into LRU cache marking it as a newest one."""
1539 self._lock.assert_locked()
1540 if size == UNKNOWN_FILE_SIZE:
1541 size = os.stat(self._path(digest)).st_size
1542 self._added.append(size)
1543 self._lru.add(digest, size)
1544
1545 def _add_oldest_list(self, digests):
1546 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1547 self._lock.assert_locked()
1548 pairs = []
1549 for digest in digests:
1550 size = os.stat(self._path(digest)).st_size
1551 self._added.append(size)
1552 pairs.append((digest, size))
1553 self._lru.batch_insert_oldest(pairs)
1554
1555 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1556 """Deletes cache file from the file system."""
1557 self._lock.assert_locked()
1558 try:
1559 if size == UNKNOWN_FILE_SIZE:
1560 size = os.stat(self._path(digest)).st_size
1561 file_path.try_remove(self._path(digest))
1562 self._removed.append(size)
1563 except OSError as e:
1564 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1565
1566
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001567class IsolatedBundle(object):
1568 """Fetched and parsed .isolated file with all dependencies."""
1569
Vadim Shtayura3148e072014-09-02 18:51:52 -07001570 def __init__(self):
1571 self.command = []
1572 self.files = {}
1573 self.read_only = None
1574 self.relative_cwd = None
1575 # The main .isolated file, a IsolatedFile instance.
1576 self.root = None
1577
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001578 def fetch(self, fetch_queue, root_isolated_hash, algo):
1579 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001580
1581 It enables support for "included" .isolated files. They are processed in
1582 strict order but fetched asynchronously from the cache. This is important so
1583 that a file in an included .isolated file that is overridden by an embedding
1584 .isolated file is not fetched needlessly. The includes are fetched in one
1585 pass and the files are fetched as soon as all the ones on the left-side
1586 of the tree were fetched.
1587
1588 The prioritization is very important here for nested .isolated files.
1589 'includes' have the highest priority and the algorithm is optimized for both
1590 deep and wide trees. A deep one is a long link of .isolated files referenced
1591 one at a time by one item in 'includes'. A wide one has a large number of
1592 'includes' in a single .isolated file. 'left' is defined as an included
1593 .isolated file earlier in the 'includes' list. So the order of the elements
1594 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001595
1596 As a side effect this method starts asynchronous fetch of all data files
1597 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1598 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001599 """
1600 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1601
1602 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1603 pending = {}
1604 # Set of hashes of already retrieved items to refuse recursive includes.
1605 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001606 # Set of IsolatedFile's whose data files have already being fetched.
1607 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001608
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001609 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001610 h = isolated_file.obj_hash
1611 if h in seen:
1612 raise isolated_format.IsolatedError(
1613 'IsolatedFile %s is retrieved recursively' % h)
1614 assert h not in pending
1615 seen.add(h)
1616 pending[h] = isolated_file
1617 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1618
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001619 # Start fetching root *.isolated file (single file, not the whole bundle).
1620 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001621
1622 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001623 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001624 item_hash = fetch_queue.wait(pending)
1625 item = pending.pop(item_hash)
1626 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001627
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001628 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001629 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001630 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001631
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001632 # Always fetch *.isolated files in traversal order, waiting if necessary
1633 # until next to-be-processed node loads. "Waiting" is done by yielding
1634 # back to the outer loop, that waits until some *.isolated is loaded.
1635 for node in isolated_format.walk_includes(self.root):
1636 if node not in processed:
1637 # Not visited, and not yet loaded -> wait for it to load.
1638 if not node.is_loaded:
1639 break
1640 # Not visited and loaded -> process it and continue the traversal.
1641 self._start_fetching_files(node, fetch_queue)
1642 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001643
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001644 # All *.isolated files should be processed by now and only them.
1645 all_isolateds = set(isolated_format.walk_includes(self.root))
1646 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001647
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001648 # Extract 'command' and other bundle properties.
1649 for node in isolated_format.walk_includes(self.root):
1650 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001651 self.relative_cwd = self.relative_cwd or ''
1652
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001653 def _start_fetching_files(self, isolated, fetch_queue):
1654 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001655
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001656 Modifies self.files.
1657 """
1658 logging.debug('fetch_files(%s)', isolated.obj_hash)
1659 for filepath, properties in isolated.data.get('files', {}).iteritems():
1660 # Root isolated has priority on the files being mapped. In particular,
1661 # overridden files must not be fetched.
1662 if filepath not in self.files:
1663 self.files[filepath] = properties
1664 if 'h' in properties:
1665 # Preemptively request files.
1666 logging.debug('fetching %s', filepath)
1667 fetch_queue.add(
1668 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1669
1670 def _update_self(self, node):
1671 """Extracts bundle global parameters from loaded *.isolated file.
1672
1673 Will be called with each loaded *.isolated file in order of traversal of
1674 isolated include graph (see isolated_format.walk_includes).
1675 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001676 # Grabs properties.
1677 if not self.command and node.data.get('command'):
1678 # Ensure paths are correctly separated on windows.
1679 self.command = node.data['command']
1680 if self.command:
1681 self.command[0] = self.command[0].replace('/', os.path.sep)
1682 self.command = tools.fix_python_path(self.command)
1683 if self.read_only is None and node.data.get('read_only') is not None:
1684 self.read_only = node.data['read_only']
1685 if (self.relative_cwd is None and
1686 node.data.get('relative_cwd') is not None):
1687 self.relative_cwd = node.data['relative_cwd']
1688
1689
Vadim Shtayura8623c272014-12-01 11:45:27 -08001690def set_storage_api_class(cls):
1691 """Replaces StorageApi implementation used by default."""
1692 global _storage_api_cls
1693 assert _storage_api_cls is None
1694 assert issubclass(cls, StorageApi)
1695 _storage_api_cls = cls
1696
1697
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001698def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001699 """Returns an object that implements low-level StorageApi interface.
1700
1701 It is used by Storage to work with single isolate |namespace|. It should
1702 rarely be used directly by clients, see 'get_storage' for
1703 a better alternative.
1704
1705 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001706 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001707 namespace: isolate namespace to operate in, also defines hashing and
1708 compression scheme used, i.e. namespace names that end with '-gzip'
1709 store compressed data.
1710
1711 Returns:
1712 Instance of StorageApi subclass.
1713 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001714 cls = _storage_api_cls or IsolateServer
1715 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001716
1717
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001718def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001719 """Returns Storage class that can upload and download from |namespace|.
1720
1721 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001722 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001723 namespace: isolate namespace to operate in, also defines hashing and
1724 compression scheme used, i.e. namespace names that end with '-gzip'
1725 store compressed data.
1726
1727 Returns:
1728 Instance of Storage.
1729 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001730 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001731
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001732
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001733def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001734 """Uploads the given tree to the given url.
1735
1736 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001737 base_url: The url of the isolate server to upload to.
1738 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001739 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001740 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001741 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001742 # Filter out symlinks, since they are not represented by items on isolate
1743 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001744 items = []
1745 seen = set()
1746 skipped = 0
1747 for filepath, metadata in infiles:
1748 if 'l' not in metadata and filepath not in seen:
1749 seen.add(filepath)
1750 item = FileItem(
1751 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001752 digest=metadata['h'],
1753 size=metadata['s'],
1754 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001755 items.append(item)
1756 else:
1757 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001758
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001759 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001760 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001761 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001762
1763
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001764def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001765 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001766
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001767 Arguments:
1768 isolated_hash: hash of the root *.isolated file.
1769 storage: Storage class that communicates with isolate storage.
1770 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001771 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001772 require_command: Ensure *.isolated specifies a command to run.
1773
1774 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001775 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001776 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001777 logging.debug(
1778 'fetch_isolated(%s, %s, %s, %s, %s)',
1779 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001780 # Hash algorithm to use, defined by namespace |storage| is using.
1781 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001782 with cache:
1783 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001784 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001785
1786 with tools.Profiler('GetIsolateds'):
1787 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001788 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001789 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1790 try:
1791 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1792 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001793 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001794 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1795 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001796
1797 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001798 bundle.fetch(fetch_queue, isolated_hash, algo)
1799 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001800 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1801 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001802 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001803
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001804 with tools.Profiler('GetRest'):
1805 # Create file system hierarchy.
1806 if not os.path.isdir(outdir):
1807 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001808 create_directories(outdir, bundle.files)
1809 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001810
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001811 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001812 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001813 if not os.path.isdir(cwd):
1814 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001815
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001816 # Multimap: digest -> list of pairs (path, props).
1817 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001818 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001819 if 'h' in props:
1820 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001821
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001822 # Now block on the remaining files to be downloaded and mapped.
1823 logging.info('Retrieving remaining files (%d of them)...',
1824 fetch_queue.pending_count)
1825 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001826 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001827 while remaining:
1828 detector.ping()
1829
1830 # Wait for any item to finish fetching to cache.
1831 digest = fetch_queue.wait(remaining)
1832
1833 # Link corresponding files to a fetched item in cache.
1834 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001835 cache.hardlink(
1836 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001837
1838 # Report progress.
1839 duration = time.time() - last_update
1840 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1841 msg = '%d files remaining...' % len(remaining)
1842 print msg
1843 logging.info(msg)
1844 last_update = time.time()
1845
1846 # Cache could evict some items we just tried to fetch, it's a fatal error.
1847 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001848 raise isolated_format.MappingError(
1849 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001850 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001851
1852
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001853def directory_to_metadata(root, algo, blacklist):
1854 """Returns the FileItem list and .isolated metadata for a directory."""
1855 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001856 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001857 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001858 metadata = {
1859 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001860 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001861 for relpath in paths
1862 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001863 for v in metadata.itervalues():
1864 v.pop('t')
1865 items = [
1866 FileItem(
1867 path=os.path.join(root, relpath),
1868 digest=meta['h'],
1869 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001870 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001871 for relpath, meta in metadata.iteritems() if 'h' in meta
1872 ]
1873 return items, metadata
1874
1875
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001876def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001877 """Stores every entries and returns the relevant data.
1878
1879 Arguments:
1880 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001881 files: list of file paths to upload. If a directory is specified, a
1882 .isolated file is created and its hash is returned.
1883 blacklist: function that returns True if a file should be omitted.
1884 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001885 assert all(isinstance(i, unicode) for i in files), files
1886 if len(files) != len(set(map(os.path.abspath, files))):
1887 raise Error('Duplicate entries found.')
1888
1889 results = []
1890 # The temporary directory is only created as needed.
1891 tempdir = None
1892 try:
1893 # TODO(maruel): Yield the files to a worker thread.
1894 items_to_upload = []
1895 for f in files:
1896 try:
1897 filepath = os.path.abspath(f)
1898 if os.path.isdir(filepath):
1899 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001900 items, metadata = directory_to_metadata(
1901 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001902
1903 # Create the .isolated file.
1904 if not tempdir:
1905 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1906 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1907 os.close(handle)
1908 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001909 'algo':
1910 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001911 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001912 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001913 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001914 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001915 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001916 items_to_upload.extend(items)
1917 items_to_upload.append(
1918 FileItem(
1919 path=isolated,
1920 digest=h,
1921 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001922 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001923 results.append((h, f))
1924
1925 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001926 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001927 items_to_upload.append(
1928 FileItem(
1929 path=filepath,
1930 digest=h,
1931 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001932 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001933 results.append((h, f))
1934 else:
1935 raise Error('%s is neither a file or directory.' % f)
1936 except OSError:
1937 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001938 # Technically we would care about which files were uploaded but we don't
1939 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001940 _uploaded_files = storage.upload_items(items_to_upload)
1941 return results
1942 finally:
1943 if tempdir:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001944 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001945
1946
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001947def archive(out, namespace, files, blacklist):
1948 if files == ['-']:
1949 files = sys.stdin.readlines()
1950
1951 if not files:
1952 raise Error('Nothing to upload')
1953
1954 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001955 blacklist = tools.gen_blacklist(blacklist)
1956 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001957 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001958 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1959
1960
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001961@subcommand.usage('<file1..fileN> or - to read from stdin')
1962def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001963 """Archives data to the server.
1964
1965 If a directory is specified, a .isolated file is created the whole directory
1966 is uploaded. Then this .isolated file can be included in another one to run
1967 commands.
1968
1969 The commands output each file that was processed with its content hash. For
1970 directories, the .isolated generated for the directory is listed as the
1971 directory entry itself.
1972 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001973 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001974 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001975 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001976 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001977 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001978 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001979 except Error as e:
1980 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001981 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001982
1983
1984def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001985 """Download data from the server.
1986
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001987 It can either download individual files or a complete tree from a .isolated
1988 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001989 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001990 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001991 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001992 '-i', '--isolated', metavar='HASH',
1993 help='hash of an isolated file, .isolated file content is discarded, use '
1994 '--file if you need it')
1995 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001996 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1997 help='hash and destination of a file, can be used multiple times')
1998 parser.add_option(
1999 '-t', '--target', metavar='DIR', default=os.getcwd(),
2000 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002001 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002002 options, args = parser.parse_args(args)
2003 if args:
2004 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002005
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002006 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002007 if bool(options.isolated) == bool(options.file):
2008 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002009
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002010 cache = process_cache_options(options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002011 options.target = os.path.abspath(options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002012 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002013 # Fetching individual files.
2014 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002015 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002016 channel = threading_utils.TaskChannel()
2017 pending = {}
2018 for digest, dest in options.file:
2019 pending[digest] = dest
2020 storage.async_fetch(
2021 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002022 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002023 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002024 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002025 functools.partial(file_write, os.path.join(options.target, dest)))
2026 while pending:
2027 fetched = channel.pull()
2028 dest = pending.pop(fetched)
2029 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002030
Vadim Shtayura3172be52013-12-03 12:49:05 -08002031 # Fetching whole isolated tree.
2032 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002033 with cache:
2034 bundle = fetch_isolated(
2035 isolated_hash=options.isolated,
2036 storage=storage,
2037 cache=cache,
2038 outdir=options.target,
2039 require_command=False)
2040 if bundle.command:
2041 rel = os.path.join(options.target, bundle.relative_cwd)
2042 print('To run this test please run from the directory %s:' %
2043 os.path.join(options.target, rel))
2044 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002045
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002046 return 0
2047
2048
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002049def add_archive_options(parser):
2050 parser.add_option(
2051 '--blacklist',
2052 action='append', default=list(DEFAULT_BLACKLIST),
2053 help='List of regexp to use as blacklist filter when uploading '
2054 'directories')
2055
2056
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002057def add_isolate_server_options(parser):
2058 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002059 parser.add_option(
2060 '-I', '--isolate-server',
2061 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002062 help='URL of the Isolate Server to use. Defaults to the environment '
2063 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2064 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002065 parser.add_option(
2066 '--namespace', default='default-gzip',
2067 help='The namespace to use on the Isolate Server, default: %default')
2068
2069
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002070def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002071 """Processes the --isolate-server option and aborts if not specified.
2072
2073 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002074 """
2075 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002076 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002077 try:
2078 options.isolate_server = net.fix_url(options.isolate_server)
2079 except ValueError as e:
2080 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002081 if set_exception_handler:
2082 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002083 try:
2084 return auth.ensure_logged_in(options.isolate_server)
2085 except ValueError as e:
2086 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002087
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002088
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002089def add_cache_options(parser):
2090 cache_group = optparse.OptionGroup(parser, 'Cache management')
2091 cache_group.add_option(
2092 '--cache', metavar='DIR',
2093 help='Directory to keep a local cache of the files. Accelerates download '
2094 'by reusing already downloaded files. Default=%default')
2095 cache_group.add_option(
2096 '--max-cache-size',
2097 type='int',
2098 metavar='NNN',
2099 default=20*1024*1024*1024,
2100 help='Trim if the cache gets larger than this value, default=%default')
2101 cache_group.add_option(
2102 '--min-free-space',
2103 type='int',
2104 metavar='NNN',
2105 default=2*1024*1024*1024,
2106 help='Trim if disk free space becomes lower than this value, '
2107 'default=%default')
2108 cache_group.add_option(
2109 '--max-items',
2110 type='int',
2111 metavar='NNN',
2112 default=100000,
2113 help='Trim if more than this number of items are in the cache '
2114 'default=%default')
2115 parser.add_option_group(cache_group)
2116
2117
2118def process_cache_options(options):
2119 if options.cache:
2120 policies = CachePolicies(
2121 options.max_cache_size, options.min_free_space, options.max_items)
2122
2123 # |options.cache| path may not exist until DiskCache() instance is created.
2124 return DiskCache(
2125 os.path.abspath(options.cache),
2126 policies,
2127 isolated_format.get_hash_algo(options.namespace))
2128 else:
2129 return MemoryCache()
2130
2131
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002132class OptionParserIsolateServer(tools.OptionParserWithLogging):
2133 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002134 tools.OptionParserWithLogging.__init__(
2135 self,
2136 version=__version__,
2137 prog=os.path.basename(sys.modules[__name__].__file__),
2138 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002139 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002140
2141 def parse_args(self, *args, **kwargs):
2142 options, args = tools.OptionParserWithLogging.parse_args(
2143 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002144 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002145 return options, args
2146
2147
2148def main(args):
2149 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002150 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002151
2152
2153if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002154 fix_encoding.fix_encoding()
2155 tools.disable_buffering()
2156 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002157 sys.exit(main(sys.argv[1:]))