blob: c469cc7d9ba029613f35f7b41737d84a26a71f2b [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
aludwin81178302016-11-30 17:18:49 -08008__version__ = '0.8.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
tanselle4288c32016-07-28 09:45:40 -070029from libs import arfile
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070036from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000037from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000038from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080040import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040041import isolated_format
aludwin81178302016-11-30 17:18:49 -080042import isolate_storage
43from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# The file size to be used when we don't know the correct file size,
51# generally used for .isolated files.
52UNKNOWN_FILE_SIZE = None
53
54
55# Maximum expected delay (in seconds) between successive file fetches or uploads
56# in Storage. If it takes longer than that, a deadlock might be happening
57# and all stack frames for all threads are dumped to log.
58DEADLOCK_TIMEOUT = 5 * 60
59
60
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000061# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000062# All files are sorted by likelihood of a change in the file content
63# (currently file size is used to estimate this: larger the file -> larger the
64# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000065# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000066# and so on. Numbers here is a trade-off; the more per request, the lower the
67# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
68# larger values cause longer lookups, increasing the initial latency to start
69# uploading, which is especially an issue for large files. This value is
70# optimized for the "few thousands files to look up with minimal number of large
71# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040072ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000073
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000074
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075# A list of already compressed extension types that should not receive any
76# compression before being uploaded.
77ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040078 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
79 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000080]
81
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000082
maruel@chromium.org41601642013-09-18 19:40:46 +000083# The delay (in seconds) to wait between logging statements when retrieving
84# the required files. This is intended to let the user (or buildbot) know that
85# the program is still running.
86DELAY_BETWEEN_UPDATES_IN_SECS = 30
87
88
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050089DEFAULT_BLACKLIST = (
90 # Temporary vim or python files.
91 r'^.+\.(?:pyc|swp)$',
92 # .git or .svn directory.
93 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
94)
95
96
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050097class Error(Exception):
98 """Generic runtime error."""
99 pass
100
101
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400102class Aborted(Error):
103 """Operation aborted."""
104 pass
105
106
nodir90bc8dc2016-06-15 13:35:21 -0700107class AlreadyExists(Error):
108 """File already exists."""
109
110
maruel12e30012015-10-09 11:55:35 -0700111def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800112 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700113 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800114 if offset:
115 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000116 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000117 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000118 if not data:
119 break
120 yield data
121
122
maruel12e30012015-10-09 11:55:35 -0700123def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000124 """Writes file content as generated by content_generator.
125
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000126 Creates the intermediary directory as needed.
127
128 Returns the number of bytes written.
129
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000130 Meant to be mocked out in unit tests.
131 """
nodire5028a92016-04-29 14:38:21 -0700132 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 total = 0
maruel12e30012015-10-09 11:55:35 -0700134 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000135 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000136 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000138 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000139
140
tansell9e04a8d2016-07-28 09:31:59 -0700141def fileobj_path(fileobj):
142 """Return file system path for file like object or None.
143
144 The returned path is guaranteed to exist and can be passed to file system
145 operations like copy.
146 """
147 name = getattr(fileobj, 'name', None)
148 if name is None:
149 return
150
151 # If the file like object was created using something like open("test.txt")
152 # name will end up being a str (such as a function outside our control, like
153 # the standard library). We want all our paths to be unicode objects, so we
154 # decode it.
155 if not isinstance(name, unicode):
156 name = name.decode(sys.getfilesystemencoding())
157
tansell26de79e2016-11-13 18:41:11 -0800158 # fs.exists requires an absolute path, otherwise it will fail with an
159 # assertion error.
160 if not os.path.isabs(name):
161 return
162
tansell9e04a8d2016-07-28 09:31:59 -0700163 if fs.exists(name):
164 return name
165
166
167# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
168# wrappers have been created.
169def fileobj_copy(
170 dstfileobj, srcfileobj, size=-1,
171 chunk_size=isolated_format.DISK_FILE_CHUNK):
172 """Copy data from srcfileobj to dstfileobj.
173
174 Providing size means exactly that amount of data will be copied (if there
175 isn't enough data, an IOError exception is thrown). Otherwise all data until
176 the EOF marker will be copied.
177 """
178 if size == -1 and hasattr(srcfileobj, 'tell'):
179 if srcfileobj.tell() != 0:
180 raise IOError('partial file but not using size')
181
182 written = 0
183 while written != size:
184 readsize = chunk_size
185 if size > 0:
186 readsize = min(readsize, size-written)
187 data = srcfileobj.read(readsize)
188 if not data:
189 if size == -1:
190 break
191 raise IOError('partial file, got %s, wanted %s' % (written, size))
192 dstfileobj.write(data)
193 written += len(data)
194
195
196def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
197 """Put srcfileobj at the given dstpath with given mode.
198
199 The function aims to do this as efficiently as possible while still allowing
200 any possible file like object be given.
201
202 Creating a tree of hardlinks has a few drawbacks:
203 - tmpfs cannot be used for the scratch space. The tree has to be on the same
204 partition as the cache.
205 - involves a write to the inode, which advances ctime, cause a metadata
206 writeback (causing disk seeking).
207 - cache ctime cannot be used to detect modifications / corruption.
208 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
209 partition. This is why the function automatically fallbacks to copying the
210 file content.
211 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
212 same owner is for all hardlinks.
213 - Anecdotal report that ext2 is known to be potentially faulty on high rate
214 of hardlink creation.
215
216 Creating a tree of symlinks has a few drawbacks:
217 - Tasks running the equivalent of os.path.realpath() will get the naked path
218 and may fail.
219 - Windows:
220 - Symlinks are reparse points:
221 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
222 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
223 - Symbolic links are Win32 paths, not NT paths.
224 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
225 - Symbolic links are supported on Windows 7 and later only.
226 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
227 default.
228 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
229 RID is present in the token;
230 https://msdn.microsoft.com/en-us/library/bb530410.aspx
231 """
232 srcpath = fileobj_path(srcfileobj)
233 if srcpath and size == -1:
234 readonly = file_mode is None or (
235 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
236
237 if readonly:
238 # If the file is read only we can link the file
239 if use_symlink:
240 link_mode = file_path.SYMLINK_WITH_FALLBACK
241 else:
242 link_mode = file_path.HARDLINK_WITH_FALLBACK
243 else:
244 # If not read only, we must copy the file
245 link_mode = file_path.COPY
246
247 file_path.link_file(dstpath, srcpath, link_mode)
248 else:
249 # Need to write out the file
250 with fs.open(dstpath, 'wb') as dstfileobj:
251 fileobj_copy(dstfileobj, srcfileobj, size)
252
253 assert fs.exists(dstpath)
254
255 # file_mode of 0 is actually valid, so need explicit check.
256 if file_mode is not None:
257 fs.chmod(dstpath, file_mode)
258
259
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000260def zip_compress(content_generator, level=7):
261 """Reads chunks from |content_generator| and yields zip compressed chunks."""
262 compressor = zlib.compressobj(level)
263 for chunk in content_generator:
264 compressed = compressor.compress(chunk)
265 if compressed:
266 yield compressed
267 tail = compressor.flush(zlib.Z_FINISH)
268 if tail:
269 yield tail
270
271
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400272def zip_decompress(
273 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000274 """Reads zipped data from |content_generator| and yields decompressed data.
275
276 Decompresses data in small chunks (no larger than |chunk_size|) so that
277 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
278
279 Raises IOError if data is corrupted or incomplete.
280 """
281 decompressor = zlib.decompressobj()
282 compressed_size = 0
283 try:
284 for chunk in content_generator:
285 compressed_size += len(chunk)
286 data = decompressor.decompress(chunk, chunk_size)
287 if data:
288 yield data
289 while decompressor.unconsumed_tail:
290 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
291 if data:
292 yield data
293 tail = decompressor.flush()
294 if tail:
295 yield tail
296 except zlib.error as e:
297 raise IOError(
298 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
299 # Ensure all data was read and decompressed.
300 if decompressor.unused_data or decompressor.unconsumed_tail:
301 raise IOError('Not all data was decompressed')
302
303
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000304def get_zip_compression_level(filename):
305 """Given a filename calculates the ideal zip compression level to use."""
306 file_ext = os.path.splitext(filename)[1].lower()
307 # TODO(csharp): Profile to find what compression level works best.
308 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
309
310
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000311def create_directories(base_directory, files):
312 """Creates the directory structure needed by the given list of files."""
313 logging.debug('create_directories(%s, %d)', base_directory, len(files))
314 # Creates the tree of directories to create.
315 directories = set(os.path.dirname(f) for f in files)
316 for item in list(directories):
317 while item:
318 directories.add(item)
319 item = os.path.dirname(item)
320 for d in sorted(directories):
321 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700322 abs_d = os.path.join(base_directory, d)
323 if not fs.isdir(abs_d):
324 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000325
326
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500327def create_symlinks(base_directory, files):
328 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000329 for filepath, properties in files:
330 if 'l' not in properties:
331 continue
332 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500333 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000334 logging.warning('Ignoring symlink %s', filepath)
335 continue
336 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700337 try:
338 os.symlink(properties['l'], outfile) # pylint: disable=E1101
339 except OSError as e:
340 if e.errno == errno.EEXIST:
341 raise AlreadyExists('File %s already exists.' % outfile)
342 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000343
344
maruel12e30012015-10-09 11:55:35 -0700345def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000346 """Determines if the given files appears valid.
347
vadimsh129e5942017-01-04 16:42:46 -0800348 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000349 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700350 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700351 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800352 try:
353 actual_size = fs.stat(path).st_size
354 except OSError as e:
355 logging.warning(
356 'Can\'t read item %s, assuming it\'s invalid: %s',
357 os.path.basename(path), e)
358 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000359 if size != actual_size:
360 logging.warning(
361 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700362 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000363 return False
364 return True
365
366
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000367class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800368 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000369
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 Its digest and size may be provided in advance, if known. Otherwise they will
371 be derived from the file content.
372 """
373
374 def __init__(self, path, digest=None, size=None, high_priority=False):
375 super(FileItem, self).__init__(
376 digest,
maruel12e30012015-10-09 11:55:35 -0700377 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800378 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000379 self.path = path
380 self.compression_level = get_zip_compression_level(path)
381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000386class BufferItem(Item):
387 """A byte buffer to push to Storage."""
388
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800389 def __init__(self, buf, high_priority=False):
390 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000391 self.buffer = buf
392
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800393 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000394 return [self.buffer]
395
396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800398 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000399
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800400 Implements compression support, parallel 'contains' checks, parallel uploads
401 and more.
402
403 Works only within single namespace (and thus hashing algorithm and compression
404 scheme are fixed).
405
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400406 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
407 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800408 """
409
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700410 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000411 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400412 self._use_zip = isolated_format.is_namespace_with_compression(
413 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400414 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 self._cpu_thread_pool = None
416 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 self._aborted = False
418 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000419
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700421 def hash_algo(self):
422 """Hashing algorithm used to name files in storage based on their content.
423
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400424 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700425 """
426 return self._hash_algo
427
428 @property
429 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500430 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700431 return self._storage_api.location
432
433 @property
434 def namespace(self):
435 """Isolate namespace used by this storage.
436
437 Indirectly defines hashing scheme and compression method used.
438 """
439 return self._storage_api.namespace
440
441 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000442 def cpu_thread_pool(self):
443 """ThreadPool for CPU-bound tasks like zipping."""
444 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500445 threads = max(threading_utils.num_processors(), 2)
446 if sys.maxsize <= 2L**32:
447 # On 32 bits userland, do not try to use more than 16 threads.
448 threads = min(threads, 16)
449 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000450 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 @property
453 def net_thread_pool(self):
454 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
455 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700456 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 def close(self):
460 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400461 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 if self._cpu_thread_pool:
463 self._cpu_thread_pool.join()
464 self._cpu_thread_pool.close()
465 self._cpu_thread_pool = None
466 if self._net_thread_pool:
467 self._net_thread_pool.join()
468 self._net_thread_pool.close()
469 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400470 logging.info('Done.')
471
472 def abort(self):
473 """Cancels any pending or future operations."""
474 # This is not strictly theadsafe, but in the worst case the logging message
475 # will be printed twice. Not a big deal. In other places it is assumed that
476 # unprotected reads and writes to _aborted are serializable (it is true
477 # for python) and thus no locking is used.
478 if not self._aborted:
479 logging.warning('Aborting... It can take a while.')
480 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000481
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 def __enter__(self):
483 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400484 assert not self._prev_sig_handlers, self._prev_sig_handlers
485 for s in (signal.SIGINT, signal.SIGTERM):
486 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 return self
488
489 def __exit__(self, _exc_type, _exc_value, _traceback):
490 """Context manager interface."""
491 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400492 while self._prev_sig_handlers:
493 s, h = self._prev_sig_handlers.popitem()
494 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 return False
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
503 items: list of Item instances that represents data to upload.
504
505 Returns:
506 List of items that were uploaded. All other items are already there.
507 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700508 logging.info('upload_items(items=%d)', len(items))
509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 # Ensure all digests are calculated.
511 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700512 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000514 # For each digest keep only first Item that matches it. All other items
515 # are just indistinguishable copies from the point of view of isolate
516 # server (it doesn't care about paths at all, only content and digests).
517 seen = {}
518 duplicates = 0
519 for item in items:
520 if seen.setdefault(item.digest, item) is not item:
521 duplicates += 1
522 items = seen.values()
523 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700524 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000525
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 channel = threading_utils.TaskChannel()
530 for missing_item, push_state in self.get_missing_items(items):
531 missing.add(missing_item)
532 self.async_push(channel, missing_item, push_state)
533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 # No need to spawn deadlock detector thread if there's nothing to upload.
535 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700536 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 detector.ping()
540 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 logging.info('All files are uploaded')
545
546 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 total = len(items)
548 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 logging.info(
550 'Total: %6d, %9.1fkb',
551 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000552 total_size / 1024.)
553 cache_hit = set(items) - missing
554 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 logging.info(
556 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
557 len(cache_hit),
558 cache_hit_size / 1024.,
559 len(cache_hit) * 100. / total,
560 cache_hit_size * 100. / total_size if total_size else 0)
561 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 logging.info(
564 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
565 len(cache_miss),
566 cache_miss_size / 1024.,
567 len(cache_miss) * 100. / total,
568 cache_miss_size * 100. / total_size if total_size else 0)
569
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 return uploaded
571
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000573 """Starts asynchronous push to the server in a parallel thread.
574
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 Can be used only after |item| was checked for presence on a server with
576 'get_missing_items' call. 'get_missing_items' returns |push_state| object
577 that contains storage specific information describing how to upload
578 the item (for example in case of cloud storage, it is signed upload URLs).
579
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000581 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000582 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800583 push_state: push state returned by 'get_missing_items' call for |item|.
584
585 Returns:
586 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800588 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400589 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700590 threading_utils.PRIORITY_HIGH if item.high_priority
591 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800592
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000593 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400594 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400595 if self._aborted:
596 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800598 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 return item
600
601 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700602 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800603 self.net_thread_pool.add_task_with_channel(
604 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 return
606
607 # If zipping is enabled, zip in a separate thread.
608 def zip_and_push():
609 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
610 # content right here. It will block until all file is zipped.
611 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400612 if self._aborted:
613 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 data = ''.join(stream)
616 except Exception as exc:
617 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800618 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000620 self.net_thread_pool.add_task_with_channel(
621 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000622 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624 def push(self, item, push_state):
625 """Synchronously pushes a single item to the server.
626
627 If you need to push many items at once, consider using 'upload_items' or
628 'async_push' with instance of TaskChannel.
629
630 Arguments:
631 item: item to upload as instance of Item class.
632 push_state: push state returned by 'get_missing_items' call for |item|.
633
634 Returns:
635 Pushed item (same object as |item|).
636 """
637 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700638 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639 self.async_push(channel, item, push_state)
640 pushed = channel.pull()
641 assert pushed is item
642 return item
643
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000644 def async_fetch(self, channel, priority, digest, size, sink):
645 """Starts asynchronous fetch from the server in a parallel thread.
646
647 Arguments:
648 channel: TaskChannel that receives back |digest| when download ends.
649 priority: thread pool task priority for the fetch.
650 digest: hex digest of an item to download.
651 size: expected size of the item (after decompression).
652 sink: function that will be called as sink(generator).
653 """
654 def fetch():
655 try:
656 # Prepare reading pipeline.
657 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700658 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400659 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000660 # Run |stream| through verifier that will assert its size.
661 verifier = FetchStreamVerifier(stream, size)
662 # Verified stream goes to |sink|.
663 sink(verifier.run())
664 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800665 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000666 raise
667 return digest
668
669 # Don't bother with zip_thread_pool for decompression. Decompression is
670 # really fast and most probably IO bound anyway.
671 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
672
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000673 def get_missing_items(self, items):
674 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000675
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000676 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000677
678 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000679 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800682 For each missing item it yields a pair (item, push_state), where:
683 * item - Item object that is missing (one of |items|).
684 * push_state - opaque object that contains storage specific information
685 describing how to upload the item (for example in case of cloud
686 storage, it is signed upload URLs). It can later be passed to
687 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000688 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000689 channel = threading_utils.TaskChannel()
690 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800691
692 # Ensure all digests are calculated.
693 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700694 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800695
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400696 def contains(batch):
697 if self._aborted:
698 raise Aborted()
699 return self._storage_api.contains(batch)
700
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000701 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800702 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400703 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400704 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000705 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800706
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000707 # Yield results as they come in.
708 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800709 for missing_item, push_state in channel.pull().iteritems():
710 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000711
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000712
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800713def batch_items_for_check(items):
714 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000715
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800716 Each batch corresponds to a single 'exists?' query to the server via a call
717 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000718
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800719 Arguments:
720 items: a list of Item objects.
721
722 Yields:
723 Batches of items to query for existence in a single operation,
724 each batch is a list of Item objects.
725 """
726 batch_count = 0
727 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
728 next_queries = []
729 for item in sorted(items, key=lambda x: x.size, reverse=True):
730 next_queries.append(item)
731 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000732 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800733 next_queries = []
734 batch_count += 1
735 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
736 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
737 if next_queries:
738 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000739
740
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000741class FetchQueue(object):
742 """Fetches items from Storage and places them into LocalCache.
743
744 It manages multiple concurrent fetch operations. Acts as a bridge between
745 Storage and LocalCache so that Storage and LocalCache don't depend on each
746 other at all.
747 """
748
749 def __init__(self, storage, cache):
750 self.storage = storage
751 self.cache = cache
752 self._channel = threading_utils.TaskChannel()
753 self._pending = set()
754 self._accessed = set()
755 self._fetched = cache.cached_set()
756
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400757 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700758 self,
759 digest,
760 size=UNKNOWN_FILE_SIZE,
761 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000762 """Starts asynchronous fetch of item |digest|."""
763 # Fetching it now?
764 if digest in self._pending:
765 return
766
767 # Mark this file as in use, verify_all_cached will later ensure it is still
768 # in cache.
769 self._accessed.add(digest)
770
771 # Already fetched? Notify cache to update item's LRU position.
772 if digest in self._fetched:
773 # 'touch' returns True if item is in cache and not corrupted.
774 if self.cache.touch(digest, size):
775 return
776 # Item is corrupted, remove it from cache and fetch it again.
777 self._fetched.remove(digest)
778 self.cache.evict(digest)
779
780 # TODO(maruel): It should look at the free disk space, the current cache
781 # size and the size of the new item on every new item:
782 # - Trim the cache as more entries are listed when free disk space is low,
783 # otherwise if the amount of data downloaded during the run > free disk
784 # space, it'll crash.
785 # - Make sure there's enough free disk space to fit all dependencies of
786 # this run! If not, abort early.
787
788 # Start fetching.
789 self._pending.add(digest)
790 self.storage.async_fetch(
791 self._channel, priority, digest, size,
792 functools.partial(self.cache.write, digest))
793
794 def wait(self, digests):
795 """Starts a loop that waits for at least one of |digests| to be retrieved.
796
797 Returns the first digest retrieved.
798 """
799 # Flush any already fetched items.
800 for digest in digests:
801 if digest in self._fetched:
802 return digest
803
804 # Ensure all requested items are being fetched now.
805 assert all(digest in self._pending for digest in digests), (
806 digests, self._pending)
807
808 # Wait for some requested item to finish fetching.
809 while self._pending:
810 digest = self._channel.pull()
811 self._pending.remove(digest)
812 self._fetched.add(digest)
813 if digest in digests:
814 return digest
815
816 # Should never reach this point due to assert above.
817 raise RuntimeError('Impossible state')
818
819 def inject_local_file(self, path, algo):
820 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700821 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 data = f.read()
823 digest = algo(data).hexdigest()
824 self.cache.write(digest, [data])
825 self._fetched.add(digest)
826 return digest
827
828 @property
829 def pending_count(self):
830 """Returns number of items to be fetched."""
831 return len(self._pending)
832
833 def verify_all_cached(self):
834 """True if all accessed items are in cache."""
835 return self._accessed.issubset(self.cache.cached_set())
836
837
838class FetchStreamVerifier(object):
839 """Verifies that fetched file is valid before passing it to the LocalCache."""
840
841 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400842 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000843 self.stream = stream
844 self.expected_size = expected_size
845 self.current_size = 0
846
847 def run(self):
848 """Generator that yields same items as |stream|.
849
850 Verifies |stream| is complete before yielding a last chunk to consumer.
851
852 Also wraps IOError produced by consumer into MappingError exceptions since
853 otherwise Storage will retry fetch on unrelated local cache errors.
854 """
855 # Read one chunk ahead, keep it in |stored|.
856 # That way a complete stream can be verified before pushing last chunk
857 # to consumer.
858 stored = None
859 for chunk in self.stream:
860 assert chunk is not None
861 if stored is not None:
862 self._inspect_chunk(stored, is_last=False)
863 try:
864 yield stored
865 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400866 raise isolated_format.MappingError(
867 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000868 stored = chunk
869 if stored is not None:
870 self._inspect_chunk(stored, is_last=True)
871 try:
872 yield stored
873 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400874 raise isolated_format.MappingError(
875 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000876
877 def _inspect_chunk(self, chunk, is_last):
878 """Called for each fetched chunk before passing it to consumer."""
879 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400880 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700881 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000882 (self.expected_size != self.current_size)):
883 raise IOError('Incorrect file size: expected %d, got %d' % (
884 self.expected_size, self.current_size))
885
886
nodir445097b2016-06-03 22:50:26 -0700887class CacheMiss(Exception):
888 """Raised when an item is not in cache."""
889
890 def __init__(self, digest):
891 self.digest = digest
892 super(CacheMiss, self).__init__(
893 'Item with digest %r is not found in cache' % digest)
894
895
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000896class LocalCache(object):
897 """Local cache that stores objects fetched via Storage.
898
899 It can be accessed concurrently from multiple threads, so it should protect
900 its internal state with some lock.
901 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500902 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000903
maruel064c0a32016-04-05 11:47:15 -0700904 def __init__(self):
905 self._lock = threading_utils.LockWithAssert()
906 # Profiling values.
907 self._added = []
908 self._initial_number_items = 0
909 self._initial_size = 0
910 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700911 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700912
nodirbe642ff2016-06-09 15:51:51 -0700913 def __contains__(self, digest):
914 raise NotImplementedError()
915
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000916 def __enter__(self):
917 """Context manager interface."""
918 return self
919
920 def __exit__(self, _exc_type, _exec_value, _traceback):
921 """Context manager interface."""
922 return False
923
maruel064c0a32016-04-05 11:47:15 -0700924 @property
925 def added(self):
926 return self._added[:]
927
928 @property
929 def evicted(self):
930 return self._evicted[:]
931
932 @property
tansell9e04a8d2016-07-28 09:31:59 -0700933 def used(self):
934 return self._used[:]
935
936 @property
maruel064c0a32016-04-05 11:47:15 -0700937 def initial_number_items(self):
938 return self._initial_number_items
939
940 @property
941 def initial_size(self):
942 return self._initial_size
943
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000944 def cached_set(self):
945 """Returns a set of all cached digests (always a new object)."""
946 raise NotImplementedError()
947
maruel36a963d2016-04-08 17:15:49 -0700948 def cleanup(self):
949 """Deletes any corrupted item from the cache and trims it if necessary."""
950 raise NotImplementedError()
951
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000952 def touch(self, digest, size):
953 """Ensures item is not corrupted and updates its LRU position.
954
955 Arguments:
956 digest: hash digest of item to check.
957 size: expected size of this item.
958
959 Returns:
960 True if item is in cache and not corrupted.
961 """
962 raise NotImplementedError()
963
964 def evict(self, digest):
965 """Removes item from cache if it's there."""
966 raise NotImplementedError()
967
tansell9e04a8d2016-07-28 09:31:59 -0700968 def getfileobj(self, digest):
969 """Returns a readable file like object.
970
971 If file exists on the file system it will have a .name attribute with an
972 absolute path to the file.
973 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000974 raise NotImplementedError()
975
976 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -0700977 """Reads data from |content| generator and stores it in cache.
978
979 Returns digest to simplify chaining.
980 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000981 raise NotImplementedError()
982
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000983
984class MemoryCache(LocalCache):
985 """LocalCache implementation that stores everything in memory."""
986
Vadim Shtayurae3fbd102014-04-29 17:05:21 -0700987 def __init__(self, file_mode_mask=0500):
988 """Args:
989 file_mode_mask: bit mask to AND file mode with. Default value will make
990 all mapped files to be read only.
991 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000992 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -0700993 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000994 self._contents = {}
995
nodirbe642ff2016-06-09 15:51:51 -0700996 def __contains__(self, digest):
997 with self._lock:
998 return digest in self._contents
999
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001000 def cached_set(self):
1001 with self._lock:
1002 return set(self._contents)
1003
maruel36a963d2016-04-08 17:15:49 -07001004 def cleanup(self):
1005 pass
1006
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001007 def touch(self, digest, size):
1008 with self._lock:
1009 return digest in self._contents
1010
1011 def evict(self, digest):
1012 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001013 v = self._contents.pop(digest, None)
1014 if v is not None:
1015 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001016
tansell9e04a8d2016-07-28 09:31:59 -07001017 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001018 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001019 try:
tansell9e04a8d2016-07-28 09:31:59 -07001020 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001021 except KeyError:
1022 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001023 self._used.append(len(d))
1024 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001025
1026 def write(self, digest, content):
1027 # Assemble whole stream before taking the lock.
1028 data = ''.join(content)
1029 with self._lock:
1030 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001031 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001032 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001033
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001034
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001035class CachePolicies(object):
1036 def __init__(self, max_cache_size, min_free_space, max_items):
1037 """
1038 Arguments:
1039 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1040 cache is effectively a leak.
1041 - min_free_space: Trim if disk free space becomes lower than this value. If
1042 0, it unconditionally fill the disk.
1043 - max_items: Maximum number of items to keep in the cache. If 0, do not
1044 enforce a limit.
1045 """
1046 self.max_cache_size = max_cache_size
1047 self.min_free_space = min_free_space
1048 self.max_items = max_items
1049
1050
1051class DiskCache(LocalCache):
1052 """Stateful LRU cache in a flat hash table in a directory.
1053
1054 Saves its state as json file.
1055 """
maruel12e30012015-10-09 11:55:35 -07001056 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001057
nodirf33b8d62016-10-26 22:34:58 -07001058 def __init__(self, cache_dir, policies, hash_algo, trim=True):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001059 """
1060 Arguments:
1061 cache_dir: directory where to place the cache.
1062 policies: cache retention policies.
1063 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001064 trim: if True to enforce |policies| right away.
1065 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001066 """
maruel064c0a32016-04-05 11:47:15 -07001067 # All protected methods (starting with '_') except _path should be called
1068 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001069 super(DiskCache, self).__init__()
1070 self.cache_dir = cache_dir
1071 self.policies = policies
1072 self.hash_algo = hash_algo
1073 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001074 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001075 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001076 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001077 file_path.ensure_tree(self.cache_dir)
1078 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001079 # The first item in the LRU cache that must not be evicted during this run
1080 # since it was referenced. All items more recent that _protected in the LRU
1081 # cache are also inherently protected. It could be a set() of all items
1082 # referenced but this increases memory usage without a use case.
1083 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001084 # Cleanup operations done by self._load(), if any.
1085 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001086 with tools.Profiler('Setup'):
1087 with self._lock:
nodirf33b8d62016-10-26 22:34:58 -07001088 self._load(trim=trim)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001089
nodirbe642ff2016-06-09 15:51:51 -07001090 def __contains__(self, digest):
1091 with self._lock:
1092 return digest in self._lru
1093
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001094 def __enter__(self):
1095 return self
1096
1097 def __exit__(self, _exc_type, _exec_value, _traceback):
1098 with tools.Profiler('CleanupTrimming'):
1099 with self._lock:
1100 self._trim()
1101
1102 logging.info(
1103 '%5d (%8dkb) added',
1104 len(self._added), sum(self._added) / 1024)
1105 logging.info(
1106 '%5d (%8dkb) current',
1107 len(self._lru),
1108 sum(self._lru.itervalues()) / 1024)
1109 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001110 '%5d (%8dkb) evicted',
1111 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001112 logging.info(
1113 ' %8dkb free',
1114 self._free_disk / 1024)
1115 return False
1116
1117 def cached_set(self):
1118 with self._lock:
1119 return self._lru.keys_set()
1120
maruel36a963d2016-04-08 17:15:49 -07001121 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001122 """Cleans up the cache directory.
1123
1124 Ensures there is no unknown files in cache_dir.
1125 Ensures the read-only bits are set correctly.
1126
1127 At that point, the cache was already loaded, trimmed to respect cache
1128 policies.
1129 """
1130 fs.chmod(self.cache_dir, 0700)
1131 # Ensure that all files listed in the state still exist and add new ones.
1132 previous = self._lru.keys_set()
1133 # It'd be faster if there were a readdir() function.
1134 for filename in fs.listdir(self.cache_dir):
1135 if filename == self.STATE_FILE:
1136 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1137 continue
1138 if filename in previous:
1139 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1140 previous.remove(filename)
1141 continue
1142
1143 # An untracked file. Delete it.
1144 logging.warning('Removing unknown file %s from cache', filename)
1145 p = self._path(filename)
1146 if fs.isdir(p):
1147 try:
1148 file_path.rmtree(p)
1149 except OSError:
1150 pass
1151 else:
1152 file_path.try_remove(p)
1153 continue
1154
1155 if previous:
1156 # Filter out entries that were not found.
1157 logging.warning('Removed %d lost files', len(previous))
1158 for filename in previous:
1159 self._lru.pop(filename)
maruel36a963d2016-04-08 17:15:49 -07001160
1161 # What remains to be done is to hash every single item to
1162 # detect corruption, then save to ensure state.json is up to date.
1163 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1164 # TODO(maruel): Let's revisit once directory metadata is stored in
1165 # state.json so only the files that had been mapped since the last cleanup()
1166 # call are manually verified.
1167 #
1168 #with self._lock:
1169 # for digest in self._lru:
1170 # if not isolated_format.is_valid_hash(
1171 # self._path(digest), self.hash_algo):
1172 # self.evict(digest)
1173 # logging.info('Deleted corrupted item: %s', digest)
1174
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001175 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001176 """Verifies an actual file is valid and bumps its LRU position.
1177
1178 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1179 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001180
1181 Note that is doesn't compute the hash so it could still be corrupted if the
1182 file size didn't change.
1183
1184 TODO(maruel): More stringent verification while keeping the check fast.
1185 """
1186 # Do the check outside the lock.
1187 if not is_valid_file(self._path(digest), size):
1188 return False
1189
1190 # Update it's LRU position.
1191 with self._lock:
1192 if digest not in self._lru:
1193 return False
1194 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001195 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001196 return True
1197
1198 def evict(self, digest):
1199 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001200 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001201 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001202 self._lru.pop(digest)
1203 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1204
tansell9e04a8d2016-07-28 09:31:59 -07001205 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001206 try:
tansell9e04a8d2016-07-28 09:31:59 -07001207 f = fs.open(self._path(digest), 'rb')
1208 with self._lock:
1209 self._used.append(self._lru[digest])
1210 return f
nodir445097b2016-06-03 22:50:26 -07001211 except IOError:
1212 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001213
1214 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001215 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001216 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001217 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001218 path = self._path(digest)
1219 # A stale broken file may remain. It is possible for the file to have write
1220 # access bit removed which would cause the file_write() call to fail to open
1221 # in write mode. Take no chance here.
1222 file_path.try_remove(path)
1223 try:
1224 size = file_write(path, content)
1225 except:
1226 # There are two possible places were an exception can occur:
1227 # 1) Inside |content| generator in case of network or unzipping errors.
1228 # 2) Inside file_write itself in case of disk IO errors.
1229 # In any case delete an incomplete file and propagate the exception to
1230 # caller, it will be logged there.
1231 file_path.try_remove(path)
1232 raise
1233 # Make the file read-only in the cache. This has a few side-effects since
1234 # the file node is modified, so every directory entries to this file becomes
1235 # read-only. It's fine here because it is a new file.
1236 file_path.set_read_only(path, True)
1237 with self._lock:
1238 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001239 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001240
nodirf33b8d62016-10-26 22:34:58 -07001241 def get_oldest(self):
1242 """Returns digest of the LRU item or None."""
1243 try:
1244 return self._lru.get_oldest()[0]
1245 except KeyError:
1246 return None
1247
1248 def get_timestamp(self, digest):
1249 """Returns timestamp of last use of an item.
1250
1251 Raises KeyError if item is not found.
1252 """
1253 return self._lru.get_timestamp(digest)
1254
1255 def trim(self):
1256 """Forces retention policies."""
1257 with self._lock:
1258 self._trim()
1259
1260 def _load(self, trim):
maruel2e8d0f52016-07-16 07:51:29 -07001261 """Loads state of the cache from json file.
1262
1263 If cache_dir does not exist on disk, it is created.
1264 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001265 self._lock.assert_locked()
1266
maruel2e8d0f52016-07-16 07:51:29 -07001267 if not fs.isfile(self.state_file):
1268 if not os.path.isdir(self.cache_dir):
1269 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001270 else:
maruel2e8d0f52016-07-16 07:51:29 -07001271 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001272 try:
1273 self._lru = lru.LRUDict.load(self.state_file)
1274 except ValueError as err:
1275 logging.error('Failed to load cache state: %s' % (err,))
1276 # Don't want to keep broken state file.
1277 file_path.try_remove(self.state_file)
nodirf33b8d62016-10-26 22:34:58 -07001278 if trim:
1279 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001280 # We want the initial cache size after trimming, i.e. what is readily
1281 # avaiable.
1282 self._initial_number_items = len(self._lru)
1283 self._initial_size = sum(self._lru.itervalues())
1284 if self._evicted:
1285 logging.info(
1286 'Trimming evicted items with the following sizes: %s',
1287 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001288
1289 def _save(self):
1290 """Saves the LRU ordering."""
1291 self._lock.assert_locked()
1292 if sys.platform != 'win32':
1293 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001294 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001295 # Necessary otherwise the file can't be created.
1296 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001297 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001298 file_path.set_read_only(self.state_file, False)
1299 self._lru.save(self.state_file)
1300
1301 def _trim(self):
1302 """Trims anything we don't know, make sure enough free space exists."""
1303 self._lock.assert_locked()
1304
1305 # Ensure maximum cache size.
1306 if self.policies.max_cache_size:
1307 total_size = sum(self._lru.itervalues())
1308 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001309 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001310
1311 # Ensure maximum number of items in the cache.
1312 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1313 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001314 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001315
1316 # Ensure enough free space.
1317 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001318 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001319 while (
1320 self.policies.min_free_space and
1321 self._lru and
1322 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001323 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001324 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001325
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001326 if trimmed_due_to_space:
1327 total_usage = sum(self._lru.itervalues())
1328 usage_percent = 0.
1329 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001330 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1331
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001332 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001333 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1334 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1335 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001336 self._free_disk / 1024.,
1337 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001338 usage_percent,
1339 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001340 self._save()
1341
1342 def _path(self, digest):
1343 """Returns the path to one item."""
1344 return os.path.join(self.cache_dir, digest)
1345
maruel2e8d0f52016-07-16 07:51:29 -07001346 def _remove_lru_file(self, allow_protected):
1347 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001348 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001349 try:
nodireabc11c2016-10-18 16:37:28 -07001350 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001351 if not allow_protected and digest == self._protected:
vadimsh129e5942017-01-04 16:42:46 -08001352 raise Error('Not enough space to fetch the whole isolated tree')
maruel083fa552016-04-08 14:38:01 -07001353 except KeyError:
1354 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001355 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001356 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001357 self._delete_file(digest, size)
1358 return size
1359
1360 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1361 """Adds an item into LRU cache marking it as a newest one."""
1362 self._lock.assert_locked()
1363 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001364 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001365 self._added.append(size)
1366 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001367 self._free_disk -= size
1368 # Do a quicker version of self._trim(). It only enforces free disk space,
1369 # not cache size limits. It doesn't actually look at real free disk space,
1370 # only uses its cache values. self._trim() will be called later to enforce
1371 # real trimming but doing this quick version here makes it possible to map
1372 # an isolated that is larger than the current amount of free disk space when
1373 # the cache size is already large.
1374 while (
1375 self.policies.min_free_space and
1376 self._lru and
1377 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001378 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001379
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001380 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1381 """Deletes cache file from the file system."""
1382 self._lock.assert_locked()
1383 try:
1384 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001385 try:
1386 size = fs.stat(self._path(digest)).st_size
1387 except OSError:
1388 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001389 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001390 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001391 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001392 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001393 if e.errno != errno.ENOENT:
1394 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001395
1396
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001397class IsolatedBundle(object):
1398 """Fetched and parsed .isolated file with all dependencies."""
1399
Vadim Shtayura3148e072014-09-02 18:51:52 -07001400 def __init__(self):
1401 self.command = []
1402 self.files = {}
1403 self.read_only = None
1404 self.relative_cwd = None
1405 # The main .isolated file, a IsolatedFile instance.
1406 self.root = None
1407
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001408 def fetch(self, fetch_queue, root_isolated_hash, algo):
1409 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001410
1411 It enables support for "included" .isolated files. They are processed in
1412 strict order but fetched asynchronously from the cache. This is important so
1413 that a file in an included .isolated file that is overridden by an embedding
1414 .isolated file is not fetched needlessly. The includes are fetched in one
1415 pass and the files are fetched as soon as all the ones on the left-side
1416 of the tree were fetched.
1417
1418 The prioritization is very important here for nested .isolated files.
1419 'includes' have the highest priority and the algorithm is optimized for both
1420 deep and wide trees. A deep one is a long link of .isolated files referenced
1421 one at a time by one item in 'includes'. A wide one has a large number of
1422 'includes' in a single .isolated file. 'left' is defined as an included
1423 .isolated file earlier in the 'includes' list. So the order of the elements
1424 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001425
1426 As a side effect this method starts asynchronous fetch of all data files
1427 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1428 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001429 """
1430 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1431
1432 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1433 pending = {}
1434 # Set of hashes of already retrieved items to refuse recursive includes.
1435 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001436 # Set of IsolatedFile's whose data files have already being fetched.
1437 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001438
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001439 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001440 h = isolated_file.obj_hash
1441 if h in seen:
1442 raise isolated_format.IsolatedError(
1443 'IsolatedFile %s is retrieved recursively' % h)
1444 assert h not in pending
1445 seen.add(h)
1446 pending[h] = isolated_file
1447 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1448
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001449 # Start fetching root *.isolated file (single file, not the whole bundle).
1450 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001451
1452 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001453 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001454 item_hash = fetch_queue.wait(pending)
1455 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001456 with fetch_queue.cache.getfileobj(item_hash) as f:
1457 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001458
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001459 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001460 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001461 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001462
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001463 # Always fetch *.isolated files in traversal order, waiting if necessary
1464 # until next to-be-processed node loads. "Waiting" is done by yielding
1465 # back to the outer loop, that waits until some *.isolated is loaded.
1466 for node in isolated_format.walk_includes(self.root):
1467 if node not in processed:
1468 # Not visited, and not yet loaded -> wait for it to load.
1469 if not node.is_loaded:
1470 break
1471 # Not visited and loaded -> process it and continue the traversal.
1472 self._start_fetching_files(node, fetch_queue)
1473 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001474
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001475 # All *.isolated files should be processed by now and only them.
1476 all_isolateds = set(isolated_format.walk_includes(self.root))
1477 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001478
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001479 # Extract 'command' and other bundle properties.
1480 for node in isolated_format.walk_includes(self.root):
1481 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001482 self.relative_cwd = self.relative_cwd or ''
1483
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001484 def _start_fetching_files(self, isolated, fetch_queue):
1485 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001486
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001487 Modifies self.files.
1488 """
maruel10bea7b2016-12-07 05:03:49 -08001489 files = isolated.data.get('files', {})
1490 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1491 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001492 # Root isolated has priority on the files being mapped. In particular,
1493 # overridden files must not be fetched.
1494 if filepath not in self.files:
1495 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001496
1497 # Make sure if the isolated is read only, the mode doesn't have write
1498 # bits.
1499 if 'm' in properties and self.read_only:
1500 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1501
1502 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001503 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001504 fetch_queue.add(
1505 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1506
1507 def _update_self(self, node):
1508 """Extracts bundle global parameters from loaded *.isolated file.
1509
1510 Will be called with each loaded *.isolated file in order of traversal of
1511 isolated include graph (see isolated_format.walk_includes).
1512 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001513 # Grabs properties.
1514 if not self.command and node.data.get('command'):
1515 # Ensure paths are correctly separated on windows.
1516 self.command = node.data['command']
1517 if self.command:
1518 self.command[0] = self.command[0].replace('/', os.path.sep)
1519 self.command = tools.fix_python_path(self.command)
1520 if self.read_only is None and node.data.get('read_only') is not None:
1521 self.read_only = node.data['read_only']
1522 if (self.relative_cwd is None and
1523 node.data.get('relative_cwd') is not None):
1524 self.relative_cwd = node.data['relative_cwd']
1525
1526
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001527def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001528 """Returns Storage class that can upload and download from |namespace|.
1529
1530 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001531 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001532 namespace: isolate namespace to operate in, also defines hashing and
1533 compression scheme used, i.e. namespace names that end with '-gzip'
1534 store compressed data.
1535
1536 Returns:
1537 Instance of Storage.
1538 """
aludwin81178302016-11-30 17:18:49 -08001539 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001540
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001541
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001542def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001543 """Uploads the given tree to the given url.
1544
1545 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001546 base_url: The url of the isolate server to upload to.
1547 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001548 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001549 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001550 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001551 # Filter out symlinks, since they are not represented by items on isolate
1552 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001553 items = []
1554 seen = set()
1555 skipped = 0
1556 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001557 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001558 if 'l' not in metadata and filepath not in seen:
1559 seen.add(filepath)
1560 item = FileItem(
1561 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001562 digest=metadata['h'],
1563 size=metadata['s'],
1564 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001565 items.append(item)
1566 else:
1567 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001568
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001569 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001570 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001571 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001572
1573
maruel4409e302016-07-19 14:25:51 -07001574def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001575 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001576
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001577 Arguments:
1578 isolated_hash: hash of the root *.isolated file.
1579 storage: Storage class that communicates with isolate storage.
1580 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001581 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001582 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001583
1584 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001585 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001586 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001587 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001588 'fetch_isolated(%s, %s, %s, %s, %s)',
1589 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001590 # Hash algorithm to use, defined by namespace |storage| is using.
1591 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001592 with cache:
1593 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001594 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001595
1596 with tools.Profiler('GetIsolateds'):
1597 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001598 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001599 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001600 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001601 try:
maruel1ceb3872015-10-14 06:10:44 -07001602 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001603 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001604 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001605 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1606 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001607
1608 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001609 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001610
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001611 with tools.Profiler('GetRest'):
1612 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001613 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001614 create_directories(outdir, bundle.files)
1615 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001616
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001617 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001618 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001619 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001620
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001621 # Multimap: digest -> list of pairs (path, props).
1622 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001623 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001624 if 'h' in props:
1625 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001626
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001627 # Now block on the remaining files to be downloaded and mapped.
1628 logging.info('Retrieving remaining files (%d of them)...',
1629 fetch_queue.pending_count)
1630 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001631 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001632 while remaining:
1633 detector.ping()
1634
1635 # Wait for any item to finish fetching to cache.
1636 digest = fetch_queue.wait(remaining)
1637
tansell9e04a8d2016-07-28 09:31:59 -07001638 # Create the files in the destination using item in cache as the
1639 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001640 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001641 fullpath = os.path.join(outdir, filepath)
1642
1643 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001644 filetype = props.get('t', 'basic')
1645
1646 if filetype == 'basic':
1647 file_mode = props.get('m')
1648 if file_mode:
1649 # Ignore all bits apart from the user
1650 file_mode &= 0700
1651 putfile(
1652 srcfileobj, fullpath, file_mode,
1653 use_symlink=use_symlinks)
1654
tansell26de79e2016-11-13 18:41:11 -08001655 elif filetype == 'tar':
1656 basedir = os.path.dirname(fullpath)
1657 with tarfile.TarFile(fileobj=srcfileobj) as extractor:
1658 for ti in extractor:
1659 if not ti.isfile():
1660 logging.warning(
1661 'Path(%r) is nonfile (%s), skipped',
1662 ti.name, ti.type)
1663 continue
1664 fp = os.path.normpath(os.path.join(basedir, ti.name))
1665 if not fp.startswith(basedir):
1666 logging.error(
1667 'Path(%r) is outside root directory',
1668 fp)
1669 ifd = extractor.extractfile(ti)
1670 file_path.ensure_tree(os.path.dirname(fp))
1671 putfile(ifd, fp, 0700, ti.size)
1672
tanselle4288c32016-07-28 09:45:40 -07001673 elif filetype == 'ar':
1674 basedir = os.path.dirname(fullpath)
1675 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
1676 for ai, ifd in extractor:
1677 fp = os.path.normpath(os.path.join(basedir, ai.name))
tansell26de79e2016-11-13 18:41:11 -08001678 if not fp.startswith(basedir):
1679 logging.error(
1680 'Path(%r) is outside root directory',
1681 fp)
tanselle4288c32016-07-28 09:45:40 -07001682 file_path.ensure_tree(os.path.dirname(fp))
1683 putfile(ifd, fp, 0700, ai.size)
1684
1685 else:
1686 raise isolated_format.IsolatedError(
1687 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001688
1689 # Report progress.
1690 duration = time.time() - last_update
1691 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1692 msg = '%d files remaining...' % len(remaining)
1693 print msg
1694 logging.info(msg)
1695 last_update = time.time()
1696
1697 # Cache could evict some items we just tried to fetch, it's a fatal error.
1698 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001699 raise isolated_format.MappingError(
1700 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001701 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001702
1703
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001704def directory_to_metadata(root, algo, blacklist):
1705 """Returns the FileItem list and .isolated metadata for a directory."""
1706 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001707 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001708 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001709 metadata = {
1710 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001711 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001712 for relpath in paths
1713 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001714 for v in metadata.itervalues():
1715 v.pop('t')
1716 items = [
1717 FileItem(
1718 path=os.path.join(root, relpath),
1719 digest=meta['h'],
1720 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001721 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001722 for relpath, meta in metadata.iteritems() if 'h' in meta
1723 ]
1724 return items, metadata
1725
1726
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001727def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001728 """Stores every entries and returns the relevant data.
1729
1730 Arguments:
1731 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001732 files: list of file paths to upload. If a directory is specified, a
1733 .isolated file is created and its hash is returned.
1734 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001735
1736 Returns:
1737 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1738 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001739 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001740 assert all(isinstance(i, unicode) for i in files), files
1741 if len(files) != len(set(map(os.path.abspath, files))):
1742 raise Error('Duplicate entries found.')
1743
maruel064c0a32016-04-05 11:47:15 -07001744 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001745 results = []
1746 # The temporary directory is only created as needed.
1747 tempdir = None
1748 try:
1749 # TODO(maruel): Yield the files to a worker thread.
1750 items_to_upload = []
1751 for f in files:
1752 try:
1753 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001754 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001755 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001756 items, metadata = directory_to_metadata(
1757 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001758
1759 # Create the .isolated file.
1760 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001761 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1762 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001763 os.close(handle)
1764 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001765 'algo':
1766 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001767 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001768 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001769 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001770 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001771 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001772 items_to_upload.extend(items)
1773 items_to_upload.append(
1774 FileItem(
1775 path=isolated,
1776 digest=h,
maruel12e30012015-10-09 11:55:35 -07001777 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001778 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001779 results.append((h, f))
1780
maruel12e30012015-10-09 11:55:35 -07001781 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001782 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001783 items_to_upload.append(
1784 FileItem(
1785 path=filepath,
1786 digest=h,
maruel12e30012015-10-09 11:55:35 -07001787 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001788 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001789 results.append((h, f))
1790 else:
1791 raise Error('%s is neither a file or directory.' % f)
1792 except OSError:
1793 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001794 uploaded = storage.upload_items(items_to_upload)
1795 cold = [i for i in items_to_upload if i in uploaded]
1796 hot = [i for i in items_to_upload if i not in uploaded]
1797 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001798 finally:
maruel12e30012015-10-09 11:55:35 -07001799 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001800 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001801
1802
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001803def archive(out, namespace, files, blacklist):
1804 if files == ['-']:
1805 files = sys.stdin.readlines()
1806
1807 if not files:
1808 raise Error('Nothing to upload')
1809
1810 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001811 blacklist = tools.gen_blacklist(blacklist)
1812 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001813 # Ignore stats.
1814 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001815 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1816
1817
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001818@subcommand.usage('<file1..fileN> or - to read from stdin')
1819def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001820 """Archives data to the server.
1821
1822 If a directory is specified, a .isolated file is created the whole directory
1823 is uploaded. Then this .isolated file can be included in another one to run
1824 commands.
1825
1826 The commands output each file that was processed with its content hash. For
1827 directories, the .isolated generated for the directory is listed as the
1828 directory entry itself.
1829 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001830 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001831 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001832 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001833 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001834 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001835 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001836 except Error as e:
1837 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001838 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001839
1840
1841def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001842 """Download data from the server.
1843
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001844 It can either download individual files or a complete tree from a .isolated
1845 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001846 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001847 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001848 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001849 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001850 help='hash of an isolated file, .isolated file content is discarded, use '
1851 '--file if you need it')
1852 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001853 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1854 help='hash and destination of a file, can be used multiple times')
1855 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001856 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001857 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001858 parser.add_option(
1859 '--use-symlinks', action='store_true',
1860 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001861 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001862 options, args = parser.parse_args(args)
1863 if args:
1864 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001865
nodir55be77b2016-05-03 09:39:57 -07001866 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001867 if bool(options.isolated) == bool(options.file):
1868 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001869 if not options.cache and options.use_symlinks:
1870 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001871
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001872 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001873 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001874 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001875 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001876 if (fs.isfile(options.target) or
1877 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001878 parser.error(
1879 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001880 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001881 # Fetching individual files.
1882 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001883 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001884 channel = threading_utils.TaskChannel()
1885 pending = {}
1886 for digest, dest in options.file:
1887 pending[digest] = dest
1888 storage.async_fetch(
1889 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001890 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001891 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001892 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001893 functools.partial(file_write, os.path.join(options.target, dest)))
1894 while pending:
1895 fetched = channel.pull()
1896 dest = pending.pop(fetched)
1897 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001898
Vadim Shtayura3172be52013-12-03 12:49:05 -08001899 # Fetching whole isolated tree.
1900 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001901 with cache:
1902 bundle = fetch_isolated(
1903 isolated_hash=options.isolated,
1904 storage=storage,
1905 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001906 outdir=options.target,
1907 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001908 if bundle.command:
1909 rel = os.path.join(options.target, bundle.relative_cwd)
1910 print('To run this test please run from the directory %s:' %
1911 os.path.join(options.target, rel))
1912 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001913
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001914 return 0
1915
1916
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001917def add_archive_options(parser):
1918 parser.add_option(
1919 '--blacklist',
1920 action='append', default=list(DEFAULT_BLACKLIST),
1921 help='List of regexp to use as blacklist filter when uploading '
1922 'directories')
1923
1924
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001925def add_isolate_server_options(parser):
1926 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001927 parser.add_option(
1928 '-I', '--isolate-server',
1929 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001930 help='URL of the Isolate Server to use. Defaults to the environment '
1931 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1932 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001933 parser.add_option(
aludwin81178302016-11-30 17:18:49 -08001934 '--is-grpc', action='store_true', help='Communicate to Isolate via gRPC')
1935 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001936 '--namespace', default='default-gzip',
1937 help='The namespace to use on the Isolate Server, default: %default')
1938
1939
nodir55be77b2016-05-03 09:39:57 -07001940def process_isolate_server_options(
1941 parser, options, set_exception_handler, required):
1942 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001943
1944 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001945 """
1946 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001947 if required:
1948 parser.error('--isolate-server is required.')
1949 return
1950
aludwin81178302016-11-30 17:18:49 -08001951 if options.is_grpc:
1952 isolate_storage.set_storage_api_class(isolate_storage.IsolateServerGrpc)
1953 else:
1954 try:
1955 options.isolate_server = net.fix_url(options.isolate_server)
1956 except ValueError as e:
1957 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001958 if set_exception_handler:
1959 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001960 try:
1961 return auth.ensure_logged_in(options.isolate_server)
1962 except ValueError as e:
1963 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001964
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001965
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001966def add_cache_options(parser):
1967 cache_group = optparse.OptionGroup(parser, 'Cache management')
1968 cache_group.add_option(
1969 '--cache', metavar='DIR',
1970 help='Directory to keep a local cache of the files. Accelerates download '
1971 'by reusing already downloaded files. Default=%default')
1972 cache_group.add_option(
1973 '--max-cache-size',
1974 type='int',
1975 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001976 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001977 help='Trim if the cache gets larger than this value, default=%default')
1978 cache_group.add_option(
1979 '--min-free-space',
1980 type='int',
1981 metavar='NNN',
1982 default=2*1024*1024*1024,
1983 help='Trim if disk free space becomes lower than this value, '
1984 'default=%default')
1985 cache_group.add_option(
1986 '--max-items',
1987 type='int',
1988 metavar='NNN',
1989 default=100000,
1990 help='Trim if more than this number of items are in the cache '
1991 'default=%default')
1992 parser.add_option_group(cache_group)
1993
1994
nodirf33b8d62016-10-26 22:34:58 -07001995def process_cache_options(options, trim=True):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001996 if options.cache:
1997 policies = CachePolicies(
1998 options.max_cache_size, options.min_free_space, options.max_items)
1999
2000 # |options.cache| path may not exist until DiskCache() instance is created.
2001 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002002 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002003 policies,
nodirf33b8d62016-10-26 22:34:58 -07002004 isolated_format.get_hash_algo(options.namespace),
2005 trim=trim)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002006 else:
2007 return MemoryCache()
2008
2009
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002010class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002011 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002012 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002013 self,
2014 version=__version__,
2015 prog=os.path.basename(sys.modules[__name__].__file__),
2016 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002017 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002018
2019 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002020 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002021 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002022 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002023 return options, args
2024
2025
2026def main(args):
2027 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002028 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002029
2030
2031if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002032 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002033 fix_encoding.fix_encoding()
2034 tools.disable_buffering()
2035 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002036 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002037 sys.exit(main(sys.argv[1:]))