blob: 9a43bfc9b9e7a8b820c5625d6c69aa7ad197ae3b [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04008__version__ = '0.8.1'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
maruel12e30012015-10-09 11:55:35 -070030from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040031from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040032from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000033from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040034from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070035from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
aludwin81178302016-11-30 17:18:49 -080041import isolate_storage
42from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080043
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000044
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045# Version of isolate protocol passed to the server in /handshake request.
46ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048
Vadim Shtayura3148e072014-09-02 18:51:52 -070049# The file size to be used when we don't know the correct file size,
50# generally used for .isolated files.
51UNKNOWN_FILE_SIZE = None
52
53
54# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050088DEFAULT_BLACKLIST = (
89 # Temporary vim or python files.
90 r'^.+\.(?:pyc|swp)$',
91 # .git or .svn directory.
92 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
93)
94
95
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050096class Error(Exception):
97 """Generic runtime error."""
98 pass
99
100
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400101class Aborted(Error):
102 """Operation aborted."""
103 pass
104
105
nodir90bc8dc2016-06-15 13:35:21 -0700106class AlreadyExists(Error):
107 """File already exists."""
108
109
maruel12e30012015-10-09 11:55:35 -0700110def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700112 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800113 if offset:
114 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000116 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 if not data:
118 break
119 yield data
120
121
maruel12e30012015-10-09 11:55:35 -0700122def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000123 """Writes file content as generated by content_generator.
124
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 Creates the intermediary directory as needed.
126
127 Returns the number of bytes written.
128
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000129 Meant to be mocked out in unit tests.
130 """
nodire5028a92016-04-29 14:38:21 -0700131 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 total = 0
maruel12e30012015-10-09 11:55:35 -0700133 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000134 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000135 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000137 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138
139
tansell9e04a8d2016-07-28 09:31:59 -0700140def fileobj_path(fileobj):
141 """Return file system path for file like object or None.
142
143 The returned path is guaranteed to exist and can be passed to file system
144 operations like copy.
145 """
146 name = getattr(fileobj, 'name', None)
147 if name is None:
148 return
149
150 # If the file like object was created using something like open("test.txt")
151 # name will end up being a str (such as a function outside our control, like
152 # the standard library). We want all our paths to be unicode objects, so we
153 # decode it.
154 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500155 # We incorrectly assume that UTF-8 is used everywhere.
156 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700157
tansell26de79e2016-11-13 18:41:11 -0800158 # fs.exists requires an absolute path, otherwise it will fail with an
159 # assertion error.
160 if not os.path.isabs(name):
161 return
162
tansell9e04a8d2016-07-28 09:31:59 -0700163 if fs.exists(name):
164 return name
165
166
167# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
168# wrappers have been created.
169def fileobj_copy(
170 dstfileobj, srcfileobj, size=-1,
171 chunk_size=isolated_format.DISK_FILE_CHUNK):
172 """Copy data from srcfileobj to dstfileobj.
173
174 Providing size means exactly that amount of data will be copied (if there
175 isn't enough data, an IOError exception is thrown). Otherwise all data until
176 the EOF marker will be copied.
177 """
178 if size == -1 and hasattr(srcfileobj, 'tell'):
179 if srcfileobj.tell() != 0:
180 raise IOError('partial file but not using size')
181
182 written = 0
183 while written != size:
184 readsize = chunk_size
185 if size > 0:
186 readsize = min(readsize, size-written)
187 data = srcfileobj.read(readsize)
188 if not data:
189 if size == -1:
190 break
191 raise IOError('partial file, got %s, wanted %s' % (written, size))
192 dstfileobj.write(data)
193 written += len(data)
194
195
196def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
197 """Put srcfileobj at the given dstpath with given mode.
198
199 The function aims to do this as efficiently as possible while still allowing
200 any possible file like object be given.
201
202 Creating a tree of hardlinks has a few drawbacks:
203 - tmpfs cannot be used for the scratch space. The tree has to be on the same
204 partition as the cache.
205 - involves a write to the inode, which advances ctime, cause a metadata
206 writeback (causing disk seeking).
207 - cache ctime cannot be used to detect modifications / corruption.
208 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
209 partition. This is why the function automatically fallbacks to copying the
210 file content.
211 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
212 same owner is for all hardlinks.
213 - Anecdotal report that ext2 is known to be potentially faulty on high rate
214 of hardlink creation.
215
216 Creating a tree of symlinks has a few drawbacks:
217 - Tasks running the equivalent of os.path.realpath() will get the naked path
218 and may fail.
219 - Windows:
220 - Symlinks are reparse points:
221 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
222 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
223 - Symbolic links are Win32 paths, not NT paths.
224 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
225 - Symbolic links are supported on Windows 7 and later only.
226 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
227 default.
228 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
229 RID is present in the token;
230 https://msdn.microsoft.com/en-us/library/bb530410.aspx
231 """
232 srcpath = fileobj_path(srcfileobj)
233 if srcpath and size == -1:
234 readonly = file_mode is None or (
235 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
236
237 if readonly:
238 # If the file is read only we can link the file
239 if use_symlink:
240 link_mode = file_path.SYMLINK_WITH_FALLBACK
241 else:
242 link_mode = file_path.HARDLINK_WITH_FALLBACK
243 else:
244 # If not read only, we must copy the file
245 link_mode = file_path.COPY
246
247 file_path.link_file(dstpath, srcpath, link_mode)
248 else:
249 # Need to write out the file
250 with fs.open(dstpath, 'wb') as dstfileobj:
251 fileobj_copy(dstfileobj, srcfileobj, size)
252
253 assert fs.exists(dstpath)
254
255 # file_mode of 0 is actually valid, so need explicit check.
256 if file_mode is not None:
257 fs.chmod(dstpath, file_mode)
258
259
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000260def zip_compress(content_generator, level=7):
261 """Reads chunks from |content_generator| and yields zip compressed chunks."""
262 compressor = zlib.compressobj(level)
263 for chunk in content_generator:
264 compressed = compressor.compress(chunk)
265 if compressed:
266 yield compressed
267 tail = compressor.flush(zlib.Z_FINISH)
268 if tail:
269 yield tail
270
271
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400272def zip_decompress(
273 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000274 """Reads zipped data from |content_generator| and yields decompressed data.
275
276 Decompresses data in small chunks (no larger than |chunk_size|) so that
277 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
278
279 Raises IOError if data is corrupted or incomplete.
280 """
281 decompressor = zlib.decompressobj()
282 compressed_size = 0
283 try:
284 for chunk in content_generator:
285 compressed_size += len(chunk)
286 data = decompressor.decompress(chunk, chunk_size)
287 if data:
288 yield data
289 while decompressor.unconsumed_tail:
290 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
291 if data:
292 yield data
293 tail = decompressor.flush()
294 if tail:
295 yield tail
296 except zlib.error as e:
297 raise IOError(
298 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
299 # Ensure all data was read and decompressed.
300 if decompressor.unused_data or decompressor.unconsumed_tail:
301 raise IOError('Not all data was decompressed')
302
303
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000304def get_zip_compression_level(filename):
305 """Given a filename calculates the ideal zip compression level to use."""
306 file_ext = os.path.splitext(filename)[1].lower()
307 # TODO(csharp): Profile to find what compression level works best.
308 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
309
310
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000311def create_directories(base_directory, files):
312 """Creates the directory structure needed by the given list of files."""
313 logging.debug('create_directories(%s, %d)', base_directory, len(files))
314 # Creates the tree of directories to create.
315 directories = set(os.path.dirname(f) for f in files)
316 for item in list(directories):
317 while item:
318 directories.add(item)
319 item = os.path.dirname(item)
320 for d in sorted(directories):
321 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700322 abs_d = os.path.join(base_directory, d)
323 if not fs.isdir(abs_d):
324 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000325
326
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500327def create_symlinks(base_directory, files):
328 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000329 for filepath, properties in files:
330 if 'l' not in properties:
331 continue
332 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500333 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000334 logging.warning('Ignoring symlink %s', filepath)
335 continue
336 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700337 try:
338 os.symlink(properties['l'], outfile) # pylint: disable=E1101
339 except OSError as e:
340 if e.errno == errno.EEXIST:
341 raise AlreadyExists('File %s already exists.' % outfile)
342 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000343
344
maruel12e30012015-10-09 11:55:35 -0700345def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000346 """Determines if the given files appears valid.
347
vadimsh129e5942017-01-04 16:42:46 -0800348 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000349 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700350 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700351 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800352 try:
353 actual_size = fs.stat(path).st_size
354 except OSError as e:
355 logging.warning(
356 'Can\'t read item %s, assuming it\'s invalid: %s',
357 os.path.basename(path), e)
358 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000359 if size != actual_size:
360 logging.warning(
361 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700362 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000363 return False
364 return True
365
366
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000367class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800368 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000369
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 Its digest and size may be provided in advance, if known. Otherwise they will
371 be derived from the file content.
372 """
373
374 def __init__(self, path, digest=None, size=None, high_priority=False):
375 super(FileItem, self).__init__(
376 digest,
maruel12e30012015-10-09 11:55:35 -0700377 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800378 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000379 self.path = path
380 self.compression_level = get_zip_compression_level(path)
381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000386class BufferItem(Item):
387 """A byte buffer to push to Storage."""
388
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800389 def __init__(self, buf, high_priority=False):
390 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000391 self.buffer = buf
392
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800393 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000394 return [self.buffer]
395
396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800398 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000399
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800400 Implements compression support, parallel 'contains' checks, parallel uploads
401 and more.
402
403 Works only within single namespace (and thus hashing algorithm and compression
404 scheme are fixed).
405
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400406 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
407 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800408 """
409
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700410 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000411 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400412 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700413 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400414 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 self._cpu_thread_pool = None
416 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 self._aborted = False
418 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000419
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700421 def hash_algo(self):
422 """Hashing algorithm used to name files in storage based on their content.
423
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400424 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700425 """
426 return self._hash_algo
427
428 @property
429 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500430 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700431 return self._storage_api.location
432
433 @property
434 def namespace(self):
435 """Isolate namespace used by this storage.
436
437 Indirectly defines hashing scheme and compression method used.
438 """
439 return self._storage_api.namespace
440
441 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000442 def cpu_thread_pool(self):
443 """ThreadPool for CPU-bound tasks like zipping."""
444 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500445 threads = max(threading_utils.num_processors(), 2)
446 if sys.maxsize <= 2L**32:
447 # On 32 bits userland, do not try to use more than 16 threads.
448 threads = min(threads, 16)
449 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000450 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 @property
453 def net_thread_pool(self):
454 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
455 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700456 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 def close(self):
460 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400461 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 if self._cpu_thread_pool:
463 self._cpu_thread_pool.join()
464 self._cpu_thread_pool.close()
465 self._cpu_thread_pool = None
466 if self._net_thread_pool:
467 self._net_thread_pool.join()
468 self._net_thread_pool.close()
469 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400470 logging.info('Done.')
471
472 def abort(self):
473 """Cancels any pending or future operations."""
474 # This is not strictly theadsafe, but in the worst case the logging message
475 # will be printed twice. Not a big deal. In other places it is assumed that
476 # unprotected reads and writes to _aborted are serializable (it is true
477 # for python) and thus no locking is used.
478 if not self._aborted:
479 logging.warning('Aborting... It can take a while.')
480 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000481
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 def __enter__(self):
483 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400484 assert not self._prev_sig_handlers, self._prev_sig_handlers
485 for s in (signal.SIGINT, signal.SIGTERM):
486 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 return self
488
489 def __exit__(self, _exc_type, _exc_value, _traceback):
490 """Context manager interface."""
491 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400492 while self._prev_sig_handlers:
493 s, h = self._prev_sig_handlers.popitem()
494 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 return False
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
503 items: list of Item instances that represents data to upload.
504
505 Returns:
506 List of items that were uploaded. All other items are already there.
507 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700508 logging.info('upload_items(items=%d)', len(items))
509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 # Ensure all digests are calculated.
511 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700512 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000514 # For each digest keep only first Item that matches it. All other items
515 # are just indistinguishable copies from the point of view of isolate
516 # server (it doesn't care about paths at all, only content and digests).
517 seen = {}
518 duplicates = 0
519 for item in items:
520 if seen.setdefault(item.digest, item) is not item:
521 duplicates += 1
522 items = seen.values()
523 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700524 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000525
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 channel = threading_utils.TaskChannel()
530 for missing_item, push_state in self.get_missing_items(items):
531 missing.add(missing_item)
532 self.async_push(channel, missing_item, push_state)
533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 # No need to spawn deadlock detector thread if there's nothing to upload.
535 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700536 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 detector.ping()
540 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 logging.info('All files are uploaded')
545
546 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 total = len(items)
548 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 logging.info(
550 'Total: %6d, %9.1fkb',
551 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000552 total_size / 1024.)
553 cache_hit = set(items) - missing
554 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 logging.info(
556 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
557 len(cache_hit),
558 cache_hit_size / 1024.,
559 len(cache_hit) * 100. / total,
560 cache_hit_size * 100. / total_size if total_size else 0)
561 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 logging.info(
564 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
565 len(cache_miss),
566 cache_miss_size / 1024.,
567 len(cache_miss) * 100. / total,
568 cache_miss_size * 100. / total_size if total_size else 0)
569
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 return uploaded
571
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000573 """Starts asynchronous push to the server in a parallel thread.
574
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 Can be used only after |item| was checked for presence on a server with
576 'get_missing_items' call. 'get_missing_items' returns |push_state| object
577 that contains storage specific information describing how to upload
578 the item (for example in case of cloud storage, it is signed upload URLs).
579
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000581 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000582 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800583 push_state: push state returned by 'get_missing_items' call for |item|.
584
585 Returns:
586 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800588 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400589 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700590 threading_utils.PRIORITY_HIGH if item.high_priority
591 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800592
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000593 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400594 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400595 if self._aborted:
596 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800598 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 return item
600
601 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700602 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800603 self.net_thread_pool.add_task_with_channel(
604 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 return
606
607 # If zipping is enabled, zip in a separate thread.
608 def zip_and_push():
609 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
610 # content right here. It will block until all file is zipped.
611 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400612 if self._aborted:
613 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 data = ''.join(stream)
616 except Exception as exc:
617 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800618 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000620 self.net_thread_pool.add_task_with_channel(
621 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000622 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624 def push(self, item, push_state):
625 """Synchronously pushes a single item to the server.
626
627 If you need to push many items at once, consider using 'upload_items' or
628 'async_push' with instance of TaskChannel.
629
630 Arguments:
631 item: item to upload as instance of Item class.
632 push_state: push state returned by 'get_missing_items' call for |item|.
633
634 Returns:
635 Pushed item (same object as |item|).
636 """
637 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700638 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639 self.async_push(channel, item, push_state)
640 pushed = channel.pull()
641 assert pushed is item
642 return item
643
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000644 def async_fetch(self, channel, priority, digest, size, sink):
645 """Starts asynchronous fetch from the server in a parallel thread.
646
647 Arguments:
648 channel: TaskChannel that receives back |digest| when download ends.
649 priority: thread pool task priority for the fetch.
650 digest: hex digest of an item to download.
651 size: expected size of the item (after decompression).
652 sink: function that will be called as sink(generator).
653 """
654 def fetch():
655 try:
656 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400657 stream = self._storage_api.fetch(digest, size, 0)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700658 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400659 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000660 # Run |stream| through verifier that will assert its size.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400661 verifier = FetchStreamVerifier(stream, self._hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000662 # Verified stream goes to |sink|.
663 sink(verifier.run())
664 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800665 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000666 raise
667 return digest
668
669 # Don't bother with zip_thread_pool for decompression. Decompression is
670 # really fast and most probably IO bound anyway.
671 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
672
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000673 def get_missing_items(self, items):
674 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000675
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000676 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000677
678 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000679 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800682 For each missing item it yields a pair (item, push_state), where:
683 * item - Item object that is missing (one of |items|).
684 * push_state - opaque object that contains storage specific information
685 describing how to upload the item (for example in case of cloud
686 storage, it is signed upload URLs). It can later be passed to
687 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000688 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000689 channel = threading_utils.TaskChannel()
690 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800691
692 # Ensure all digests are calculated.
693 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700694 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800695
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400696 def contains(batch):
697 if self._aborted:
698 raise Aborted()
699 return self._storage_api.contains(batch)
700
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000701 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800702 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400703 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400704 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000705 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800706
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000707 # Yield results as they come in.
708 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800709 for missing_item, push_state in channel.pull().iteritems():
710 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000711
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000712
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800713def batch_items_for_check(items):
714 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000715
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800716 Each batch corresponds to a single 'exists?' query to the server via a call
717 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000718
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800719 Arguments:
720 items: a list of Item objects.
721
722 Yields:
723 Batches of items to query for existence in a single operation,
724 each batch is a list of Item objects.
725 """
726 batch_count = 0
727 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
728 next_queries = []
729 for item in sorted(items, key=lambda x: x.size, reverse=True):
730 next_queries.append(item)
731 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000732 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800733 next_queries = []
734 batch_count += 1
735 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
736 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
737 if next_queries:
738 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000739
740
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000741class FetchQueue(object):
742 """Fetches items from Storage and places them into LocalCache.
743
744 It manages multiple concurrent fetch operations. Acts as a bridge between
745 Storage and LocalCache so that Storage and LocalCache don't depend on each
746 other at all.
747 """
748
749 def __init__(self, storage, cache):
750 self.storage = storage
751 self.cache = cache
752 self._channel = threading_utils.TaskChannel()
753 self._pending = set()
754 self._accessed = set()
755 self._fetched = cache.cached_set()
756
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400757 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700758 self,
759 digest,
760 size=UNKNOWN_FILE_SIZE,
761 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000762 """Starts asynchronous fetch of item |digest|."""
763 # Fetching it now?
764 if digest in self._pending:
765 return
766
767 # Mark this file as in use, verify_all_cached will later ensure it is still
768 # in cache.
769 self._accessed.add(digest)
770
771 # Already fetched? Notify cache to update item's LRU position.
772 if digest in self._fetched:
773 # 'touch' returns True if item is in cache and not corrupted.
774 if self.cache.touch(digest, size):
775 return
776 # Item is corrupted, remove it from cache and fetch it again.
777 self._fetched.remove(digest)
778 self.cache.evict(digest)
779
780 # TODO(maruel): It should look at the free disk space, the current cache
781 # size and the size of the new item on every new item:
782 # - Trim the cache as more entries are listed when free disk space is low,
783 # otherwise if the amount of data downloaded during the run > free disk
784 # space, it'll crash.
785 # - Make sure there's enough free disk space to fit all dependencies of
786 # this run! If not, abort early.
787
788 # Start fetching.
789 self._pending.add(digest)
790 self.storage.async_fetch(
791 self._channel, priority, digest, size,
792 functools.partial(self.cache.write, digest))
793
794 def wait(self, digests):
795 """Starts a loop that waits for at least one of |digests| to be retrieved.
796
797 Returns the first digest retrieved.
798 """
799 # Flush any already fetched items.
800 for digest in digests:
801 if digest in self._fetched:
802 return digest
803
804 # Ensure all requested items are being fetched now.
805 assert all(digest in self._pending for digest in digests), (
806 digests, self._pending)
807
808 # Wait for some requested item to finish fetching.
809 while self._pending:
810 digest = self._channel.pull()
811 self._pending.remove(digest)
812 self._fetched.add(digest)
813 if digest in digests:
814 return digest
815
816 # Should never reach this point due to assert above.
817 raise RuntimeError('Impossible state')
818
819 def inject_local_file(self, path, algo):
820 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700821 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 data = f.read()
823 digest = algo(data).hexdigest()
824 self.cache.write(digest, [data])
825 self._fetched.add(digest)
826 return digest
827
828 @property
829 def pending_count(self):
830 """Returns number of items to be fetched."""
831 return len(self._pending)
832
833 def verify_all_cached(self):
834 """True if all accessed items are in cache."""
835 return self._accessed.issubset(self.cache.cached_set())
836
837
838class FetchStreamVerifier(object):
839 """Verifies that fetched file is valid before passing it to the LocalCache."""
840
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400841 def __init__(self, stream, hasher, expected_digest, expected_size):
842 """Initializes the verifier.
843
844 Arguments:
845 * stream: an iterable yielding chunks of content
846 * hasher: an object from hashlib that supports update() and hexdigest()
847 (eg, hashlib.sha1).
848 * expected_digest: if the entire stream is piped through hasher and then
849 summarized via hexdigest(), this should be the result. That is, it
850 should be a hex string like 'abc123'.
851 * expected_size: either the expected size of the stream, or
852 UNKNOWN_FILE_SIZE.
853 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400854 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000855 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400856 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000857 self.expected_size = expected_size
858 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400859 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000860
861 def run(self):
862 """Generator that yields same items as |stream|.
863
864 Verifies |stream| is complete before yielding a last chunk to consumer.
865
866 Also wraps IOError produced by consumer into MappingError exceptions since
867 otherwise Storage will retry fetch on unrelated local cache errors.
868 """
869 # Read one chunk ahead, keep it in |stored|.
870 # That way a complete stream can be verified before pushing last chunk
871 # to consumer.
872 stored = None
873 for chunk in self.stream:
874 assert chunk is not None
875 if stored is not None:
876 self._inspect_chunk(stored, is_last=False)
877 try:
878 yield stored
879 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400880 raise isolated_format.MappingError(
881 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000882 stored = chunk
883 if stored is not None:
884 self._inspect_chunk(stored, is_last=True)
885 try:
886 yield stored
887 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400888 raise isolated_format.MappingError(
889 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000890
891 def _inspect_chunk(self, chunk, is_last):
892 """Called for each fetched chunk before passing it to consumer."""
893 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400894 self.rolling_hash.update(chunk)
895 if not is_last:
896 return
897
898 if ((self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000899 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400900 msg = 'Incorrect file size: want %d, got %d' % (
901 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400902 raise IOError(msg)
903
904 actual_digest = self.rolling_hash.hexdigest()
905 if self.expected_digest != actual_digest:
906 msg = 'Incorrect digest: want %s, got %s' % (
907 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400908 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000909
910
nodir445097b2016-06-03 22:50:26 -0700911class CacheMiss(Exception):
912 """Raised when an item is not in cache."""
913
914 def __init__(self, digest):
915 self.digest = digest
916 super(CacheMiss, self).__init__(
917 'Item with digest %r is not found in cache' % digest)
918
919
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000920class LocalCache(object):
921 """Local cache that stores objects fetched via Storage.
922
923 It can be accessed concurrently from multiple threads, so it should protect
924 its internal state with some lock.
925 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500926 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000927
maruel064c0a32016-04-05 11:47:15 -0700928 def __init__(self):
929 self._lock = threading_utils.LockWithAssert()
930 # Profiling values.
931 self._added = []
932 self._initial_number_items = 0
933 self._initial_size = 0
934 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700935 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700936
nodirbe642ff2016-06-09 15:51:51 -0700937 def __contains__(self, digest):
938 raise NotImplementedError()
939
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000940 def __enter__(self):
941 """Context manager interface."""
942 return self
943
944 def __exit__(self, _exc_type, _exec_value, _traceback):
945 """Context manager interface."""
946 return False
947
maruel064c0a32016-04-05 11:47:15 -0700948 @property
949 def added(self):
950 return self._added[:]
951
952 @property
953 def evicted(self):
954 return self._evicted[:]
955
956 @property
tansell9e04a8d2016-07-28 09:31:59 -0700957 def used(self):
958 return self._used[:]
959
960 @property
maruel064c0a32016-04-05 11:47:15 -0700961 def initial_number_items(self):
962 return self._initial_number_items
963
964 @property
965 def initial_size(self):
966 return self._initial_size
967
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000968 def cached_set(self):
969 """Returns a set of all cached digests (always a new object)."""
970 raise NotImplementedError()
971
maruel36a963d2016-04-08 17:15:49 -0700972 def cleanup(self):
973 """Deletes any corrupted item from the cache and trims it if necessary."""
974 raise NotImplementedError()
975
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000976 def touch(self, digest, size):
977 """Ensures item is not corrupted and updates its LRU position.
978
979 Arguments:
980 digest: hash digest of item to check.
981 size: expected size of this item.
982
983 Returns:
984 True if item is in cache and not corrupted.
985 """
986 raise NotImplementedError()
987
988 def evict(self, digest):
989 """Removes item from cache if it's there."""
990 raise NotImplementedError()
991
tansell9e04a8d2016-07-28 09:31:59 -0700992 def getfileobj(self, digest):
993 """Returns a readable file like object.
994
995 If file exists on the file system it will have a .name attribute with an
996 absolute path to the file.
997 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000998 raise NotImplementedError()
999
1000 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001001 """Reads data from |content| generator and stores it in cache.
1002
1003 Returns digest to simplify chaining.
1004 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001005 raise NotImplementedError()
1006
maruele6fc9382017-05-04 09:03:48 -07001007 def trim(self):
1008 """Enforces cache policies.
1009
1010 Returns:
1011 Number of items evicted.
1012 """
1013 raise NotImplementedError()
1014
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015
1016class MemoryCache(LocalCache):
1017 """LocalCache implementation that stores everything in memory."""
1018
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001019 def __init__(self, file_mode_mask=0500):
1020 """Args:
1021 file_mode_mask: bit mask to AND file mode with. Default value will make
1022 all mapped files to be read only.
1023 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001024 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001025 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001026 self._contents = {}
1027
nodirbe642ff2016-06-09 15:51:51 -07001028 def __contains__(self, digest):
1029 with self._lock:
1030 return digest in self._contents
1031
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001032 def cached_set(self):
1033 with self._lock:
1034 return set(self._contents)
1035
maruel36a963d2016-04-08 17:15:49 -07001036 def cleanup(self):
1037 pass
1038
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001039 def touch(self, digest, size):
1040 with self._lock:
1041 return digest in self._contents
1042
1043 def evict(self, digest):
1044 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001045 v = self._contents.pop(digest, None)
1046 if v is not None:
1047 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001048
tansell9e04a8d2016-07-28 09:31:59 -07001049 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001050 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001051 try:
tansell9e04a8d2016-07-28 09:31:59 -07001052 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001053 except KeyError:
1054 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001055 self._used.append(len(d))
1056 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001057
1058 def write(self, digest, content):
1059 # Assemble whole stream before taking the lock.
1060 data = ''.join(content)
1061 with self._lock:
1062 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001063 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001064 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001065
maruele6fc9382017-05-04 09:03:48 -07001066 def trim(self):
1067 """Trimming is not implemented for MemoryCache."""
1068 return 0
1069
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001070
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001071class CachePolicies(object):
1072 def __init__(self, max_cache_size, min_free_space, max_items):
1073 """
1074 Arguments:
1075 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1076 cache is effectively a leak.
1077 - min_free_space: Trim if disk free space becomes lower than this value. If
1078 0, it unconditionally fill the disk.
1079 - max_items: Maximum number of items to keep in the cache. If 0, do not
1080 enforce a limit.
1081 """
1082 self.max_cache_size = max_cache_size
1083 self.min_free_space = min_free_space
1084 self.max_items = max_items
1085
Marc-Antoine Ruel211a9552018-01-17 16:47:29 -05001086 def __str__(self):
1087 return (
1088 'CachePolicies(cache=%dbytes, %d items; min_free_space=%d)') % (
1089 self.max_cache_size, self.max_items, self.min_free_space)
1090
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001091
1092class DiskCache(LocalCache):
1093 """Stateful LRU cache in a flat hash table in a directory.
1094
1095 Saves its state as json file.
1096 """
maruel12e30012015-10-09 11:55:35 -07001097 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001098
maruele6fc9382017-05-04 09:03:48 -07001099 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001100 """
1101 Arguments:
1102 cache_dir: directory where to place the cache.
1103 policies: cache retention policies.
1104 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001105 trim: if True to enforce |policies| right away.
1106 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001107 """
maruel064c0a32016-04-05 11:47:15 -07001108 # All protected methods (starting with '_') except _path should be called
1109 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001110 super(DiskCache, self).__init__()
1111 self.cache_dir = cache_dir
1112 self.policies = policies
1113 self.hash_algo = hash_algo
1114 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001115 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001116 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001117 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001118 file_path.ensure_tree(self.cache_dir)
1119 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001120 # The first item in the LRU cache that must not be evicted during this run
1121 # since it was referenced. All items more recent that _protected in the LRU
1122 # cache are also inherently protected. It could be a set() of all items
1123 # referenced but this increases memory usage without a use case.
1124 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001125 # Cleanup operations done by self._load(), if any.
1126 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001127 with tools.Profiler('Setup'):
1128 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001129 self._load(trim, time_fn)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001130
nodirbe642ff2016-06-09 15:51:51 -07001131 def __contains__(self, digest):
1132 with self._lock:
1133 return digest in self._lru
1134
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001135 def __enter__(self):
1136 return self
1137
1138 def __exit__(self, _exc_type, _exec_value, _traceback):
1139 with tools.Profiler('CleanupTrimming'):
1140 with self._lock:
1141 self._trim()
1142
1143 logging.info(
1144 '%5d (%8dkb) added',
1145 len(self._added), sum(self._added) / 1024)
1146 logging.info(
1147 '%5d (%8dkb) current',
1148 len(self._lru),
1149 sum(self._lru.itervalues()) / 1024)
1150 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001151 '%5d (%8dkb) evicted',
1152 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001153 logging.info(
1154 ' %8dkb free',
1155 self._free_disk / 1024)
1156 return False
1157
1158 def cached_set(self):
1159 with self._lock:
1160 return self._lru.keys_set()
1161
maruel36a963d2016-04-08 17:15:49 -07001162 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001163 """Cleans up the cache directory.
1164
1165 Ensures there is no unknown files in cache_dir.
1166 Ensures the read-only bits are set correctly.
1167
1168 At that point, the cache was already loaded, trimmed to respect cache
1169 policies.
1170 """
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001171 with self._lock:
1172 fs.chmod(self.cache_dir, 0700)
1173 # Ensure that all files listed in the state still exist and add new ones.
1174 previous = self._lru.keys_set()
1175 # It'd be faster if there were a readdir() function.
1176 for filename in fs.listdir(self.cache_dir):
1177 if filename == self.STATE_FILE:
1178 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1179 continue
1180 if filename in previous:
1181 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1182 previous.remove(filename)
1183 continue
1184
1185 # An untracked file. Delete it.
1186 logging.warning('Removing unknown file %s from cache', filename)
1187 p = self._path(filename)
1188 if fs.isdir(p):
1189 try:
1190 file_path.rmtree(p)
1191 except OSError:
1192 pass
1193 else:
1194 file_path.try_remove(p)
maruel2e8d0f52016-07-16 07:51:29 -07001195 continue
1196
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001197 if previous:
1198 # Filter out entries that were not found.
1199 logging.warning('Removed %d lost files', len(previous))
1200 for filename in previous:
1201 self._lru.pop(filename)
1202 self._save()
maruel36a963d2016-04-08 17:15:49 -07001203
1204 # What remains to be done is to hash every single item to
1205 # detect corruption, then save to ensure state.json is up to date.
1206 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1207 # TODO(maruel): Let's revisit once directory metadata is stored in
1208 # state.json so only the files that had been mapped since the last cleanup()
1209 # call are manually verified.
1210 #
1211 #with self._lock:
1212 # for digest in self._lru:
1213 # if not isolated_format.is_valid_hash(
1214 # self._path(digest), self.hash_algo):
1215 # self.evict(digest)
1216 # logging.info('Deleted corrupted item: %s', digest)
1217
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001218 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001219 """Verifies an actual file is valid and bumps its LRU position.
1220
1221 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1222 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001223
1224 Note that is doesn't compute the hash so it could still be corrupted if the
1225 file size didn't change.
1226
1227 TODO(maruel): More stringent verification while keeping the check fast.
1228 """
1229 # Do the check outside the lock.
1230 if not is_valid_file(self._path(digest), size):
1231 return False
1232
1233 # Update it's LRU position.
1234 with self._lock:
1235 if digest not in self._lru:
1236 return False
1237 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001238 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001239 return True
1240
1241 def evict(self, digest):
1242 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001243 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001244 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001245 self._lru.pop(digest)
1246 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1247
tansell9e04a8d2016-07-28 09:31:59 -07001248 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001249 try:
tansell9e04a8d2016-07-28 09:31:59 -07001250 f = fs.open(self._path(digest), 'rb')
1251 with self._lock:
1252 self._used.append(self._lru[digest])
1253 return f
nodir445097b2016-06-03 22:50:26 -07001254 except IOError:
1255 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001256
1257 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001258 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001259 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001260 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001261 path = self._path(digest)
1262 # A stale broken file may remain. It is possible for the file to have write
1263 # access bit removed which would cause the file_write() call to fail to open
1264 # in write mode. Take no chance here.
1265 file_path.try_remove(path)
1266 try:
1267 size = file_write(path, content)
1268 except:
1269 # There are two possible places were an exception can occur:
1270 # 1) Inside |content| generator in case of network or unzipping errors.
1271 # 2) Inside file_write itself in case of disk IO errors.
1272 # In any case delete an incomplete file and propagate the exception to
1273 # caller, it will be logged there.
1274 file_path.try_remove(path)
1275 raise
1276 # Make the file read-only in the cache. This has a few side-effects since
1277 # the file node is modified, so every directory entries to this file becomes
1278 # read-only. It's fine here because it is a new file.
1279 file_path.set_read_only(path, True)
1280 with self._lock:
1281 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001282 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001283
nodirf33b8d62016-10-26 22:34:58 -07001284 def get_oldest(self):
1285 """Returns digest of the LRU item or None."""
1286 try:
1287 return self._lru.get_oldest()[0]
1288 except KeyError:
1289 return None
1290
1291 def get_timestamp(self, digest):
1292 """Returns timestamp of last use of an item.
1293
1294 Raises KeyError if item is not found.
1295 """
1296 return self._lru.get_timestamp(digest)
1297
1298 def trim(self):
1299 """Forces retention policies."""
1300 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001301 return self._trim()
nodirf33b8d62016-10-26 22:34:58 -07001302
maruele6fc9382017-05-04 09:03:48 -07001303 def _load(self, trim, time_fn):
maruel2e8d0f52016-07-16 07:51:29 -07001304 """Loads state of the cache from json file.
1305
1306 If cache_dir does not exist on disk, it is created.
1307 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001308 self._lock.assert_locked()
1309
maruel2e8d0f52016-07-16 07:51:29 -07001310 if not fs.isfile(self.state_file):
1311 if not os.path.isdir(self.cache_dir):
1312 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001313 else:
maruel2e8d0f52016-07-16 07:51:29 -07001314 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001315 try:
1316 self._lru = lru.LRUDict.load(self.state_file)
1317 except ValueError as err:
1318 logging.error('Failed to load cache state: %s' % (err,))
1319 # Don't want to keep broken state file.
1320 file_path.try_remove(self.state_file)
maruele6fc9382017-05-04 09:03:48 -07001321 if time_fn:
1322 self._lru.time_fn = time_fn
nodirf33b8d62016-10-26 22:34:58 -07001323 if trim:
1324 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001325 # We want the initial cache size after trimming, i.e. what is readily
1326 # avaiable.
1327 self._initial_number_items = len(self._lru)
1328 self._initial_size = sum(self._lru.itervalues())
1329 if self._evicted:
1330 logging.info(
1331 'Trimming evicted items with the following sizes: %s',
1332 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001333
1334 def _save(self):
1335 """Saves the LRU ordering."""
1336 self._lock.assert_locked()
1337 if sys.platform != 'win32':
1338 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001339 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001340 # Necessary otherwise the file can't be created.
1341 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001342 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001343 file_path.set_read_only(self.state_file, False)
1344 self._lru.save(self.state_file)
1345
1346 def _trim(self):
1347 """Trims anything we don't know, make sure enough free space exists."""
1348 self._lock.assert_locked()
1349
1350 # Ensure maximum cache size.
1351 if self.policies.max_cache_size:
1352 total_size = sum(self._lru.itervalues())
1353 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001354 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001355
1356 # Ensure maximum number of items in the cache.
1357 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1358 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001359 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001360
1361 # Ensure enough free space.
1362 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001363 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001364 while (
1365 self.policies.min_free_space and
1366 self._lru and
1367 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001368 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001369 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001370
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001371 if trimmed_due_to_space:
1372 total_usage = sum(self._lru.itervalues())
1373 usage_percent = 0.
1374 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001375 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1376
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001377 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001378 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1379 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1380 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001381 self._free_disk / 1024.,
1382 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001383 usage_percent,
1384 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001385 self._save()
maruele6fc9382017-05-04 09:03:48 -07001386 return trimmed_due_to_space
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001387
1388 def _path(self, digest):
1389 """Returns the path to one item."""
1390 return os.path.join(self.cache_dir, digest)
1391
maruel2e8d0f52016-07-16 07:51:29 -07001392 def _remove_lru_file(self, allow_protected):
1393 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001394 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001395 try:
nodireabc11c2016-10-18 16:37:28 -07001396 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001397 if not allow_protected and digest == self._protected:
Marc-Antoine Ruel211a9552018-01-17 16:47:29 -05001398 total_size = sum(self._lru.itervalues())+size
1399 msg = (
1400 'Not enough space to fetch the whole isolated tree.\n'
1401 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
1402 self.policies, total_size, len(self._lru)+1, self._free_disk)
1403 raise Error(msg)
maruel083fa552016-04-08 14:38:01 -07001404 except KeyError:
1405 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001406 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001407 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001408 self._delete_file(digest, size)
1409 return size
1410
1411 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1412 """Adds an item into LRU cache marking it as a newest one."""
1413 self._lock.assert_locked()
1414 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001415 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001416 self._added.append(size)
1417 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001418 self._free_disk -= size
1419 # Do a quicker version of self._trim(). It only enforces free disk space,
1420 # not cache size limits. It doesn't actually look at real free disk space,
1421 # only uses its cache values. self._trim() will be called later to enforce
1422 # real trimming but doing this quick version here makes it possible to map
1423 # an isolated that is larger than the current amount of free disk space when
1424 # the cache size is already large.
1425 while (
1426 self.policies.min_free_space and
1427 self._lru and
1428 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001429 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001430
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001431 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1432 """Deletes cache file from the file system."""
1433 self._lock.assert_locked()
1434 try:
1435 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001436 try:
1437 size = fs.stat(self._path(digest)).st_size
1438 except OSError:
1439 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001440 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001441 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001442 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001443 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001444 if e.errno != errno.ENOENT:
1445 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001446
1447
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001448class IsolatedBundle(object):
1449 """Fetched and parsed .isolated file with all dependencies."""
1450
Vadim Shtayura3148e072014-09-02 18:51:52 -07001451 def __init__(self):
1452 self.command = []
1453 self.files = {}
1454 self.read_only = None
1455 self.relative_cwd = None
1456 # The main .isolated file, a IsolatedFile instance.
1457 self.root = None
1458
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001459 def fetch(self, fetch_queue, root_isolated_hash, algo):
1460 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001461
1462 It enables support for "included" .isolated files. They are processed in
1463 strict order but fetched asynchronously from the cache. This is important so
1464 that a file in an included .isolated file that is overridden by an embedding
1465 .isolated file is not fetched needlessly. The includes are fetched in one
1466 pass and the files are fetched as soon as all the ones on the left-side
1467 of the tree were fetched.
1468
1469 The prioritization is very important here for nested .isolated files.
1470 'includes' have the highest priority and the algorithm is optimized for both
1471 deep and wide trees. A deep one is a long link of .isolated files referenced
1472 one at a time by one item in 'includes'. A wide one has a large number of
1473 'includes' in a single .isolated file. 'left' is defined as an included
1474 .isolated file earlier in the 'includes' list. So the order of the elements
1475 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001476
1477 As a side effect this method starts asynchronous fetch of all data files
1478 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1479 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001480 """
1481 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1482
1483 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1484 pending = {}
1485 # Set of hashes of already retrieved items to refuse recursive includes.
1486 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001487 # Set of IsolatedFile's whose data files have already being fetched.
1488 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001489
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001490 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001491 h = isolated_file.obj_hash
1492 if h in seen:
1493 raise isolated_format.IsolatedError(
1494 'IsolatedFile %s is retrieved recursively' % h)
1495 assert h not in pending
1496 seen.add(h)
1497 pending[h] = isolated_file
1498 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1499
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001500 # Start fetching root *.isolated file (single file, not the whole bundle).
1501 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001502
1503 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001504 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001505 item_hash = fetch_queue.wait(pending)
1506 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001507 with fetch_queue.cache.getfileobj(item_hash) as f:
1508 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001509
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001510 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001511 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001512 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001513
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001514 # Always fetch *.isolated files in traversal order, waiting if necessary
1515 # until next to-be-processed node loads. "Waiting" is done by yielding
1516 # back to the outer loop, that waits until some *.isolated is loaded.
1517 for node in isolated_format.walk_includes(self.root):
1518 if node not in processed:
1519 # Not visited, and not yet loaded -> wait for it to load.
1520 if not node.is_loaded:
1521 break
1522 # Not visited and loaded -> process it and continue the traversal.
1523 self._start_fetching_files(node, fetch_queue)
1524 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001525
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001526 # All *.isolated files should be processed by now and only them.
1527 all_isolateds = set(isolated_format.walk_includes(self.root))
1528 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001529
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001530 # Extract 'command' and other bundle properties.
1531 for node in isolated_format.walk_includes(self.root):
1532 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001533 self.relative_cwd = self.relative_cwd or ''
1534
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001535 def _start_fetching_files(self, isolated, fetch_queue):
1536 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001537
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001538 Modifies self.files.
1539 """
maruel10bea7b2016-12-07 05:03:49 -08001540 files = isolated.data.get('files', {})
1541 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1542 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001543 # Root isolated has priority on the files being mapped. In particular,
1544 # overridden files must not be fetched.
1545 if filepath not in self.files:
1546 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001547
1548 # Make sure if the isolated is read only, the mode doesn't have write
1549 # bits.
1550 if 'm' in properties and self.read_only:
1551 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1552
1553 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001554 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001555 fetch_queue.add(
1556 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1557
1558 def _update_self(self, node):
1559 """Extracts bundle global parameters from loaded *.isolated file.
1560
1561 Will be called with each loaded *.isolated file in order of traversal of
1562 isolated include graph (see isolated_format.walk_includes).
1563 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001564 # Grabs properties.
1565 if not self.command and node.data.get('command'):
1566 # Ensure paths are correctly separated on windows.
1567 self.command = node.data['command']
1568 if self.command:
1569 self.command[0] = self.command[0].replace('/', os.path.sep)
1570 self.command = tools.fix_python_path(self.command)
1571 if self.read_only is None and node.data.get('read_only') is not None:
1572 self.read_only = node.data['read_only']
1573 if (self.relative_cwd is None and
1574 node.data.get('relative_cwd') is not None):
1575 self.relative_cwd = node.data['relative_cwd']
1576
1577
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001578def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001579 """Returns Storage class that can upload and download from |namespace|.
1580
1581 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001582 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001583 namespace: isolate namespace to operate in, also defines hashing and
1584 compression scheme used, i.e. namespace names that end with '-gzip'
1585 store compressed data.
1586
1587 Returns:
1588 Instance of Storage.
1589 """
aludwin81178302016-11-30 17:18:49 -08001590 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001591
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001592
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001593def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001594 """Uploads the given tree to the given url.
1595
1596 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001597 base_url: The url of the isolate server to upload to.
1598 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001599 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001600 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001601 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001602 # Filter out symlinks, since they are not represented by items on isolate
1603 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001604 items = []
1605 seen = set()
1606 skipped = 0
1607 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001608 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001609 if 'l' not in metadata and filepath not in seen:
1610 seen.add(filepath)
1611 item = FileItem(
1612 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001613 digest=metadata['h'],
1614 size=metadata['s'],
1615 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001616 items.append(item)
1617 else:
1618 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001619
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001620 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001621 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001622 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001623
1624
maruel4409e302016-07-19 14:25:51 -07001625def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001626 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001627
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001628 Arguments:
1629 isolated_hash: hash of the root *.isolated file.
1630 storage: Storage class that communicates with isolate storage.
1631 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001632 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001633 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001634
1635 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001636 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001637 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001638 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001639 'fetch_isolated(%s, %s, %s, %s, %s)',
1640 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001641 # Hash algorithm to use, defined by namespace |storage| is using.
1642 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001643 with cache:
1644 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001645 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001646
1647 with tools.Profiler('GetIsolateds'):
1648 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001649 if not isolated_format.is_valid_hash(isolated_hash, algo):
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001650 logging.debug('%s is not a valid hash, assuming a file '
1651 '(algo was %s, hash size was %d)',
1652 isolated_hash, algo(), algo().digest_size)
maruel1ceb3872015-10-14 06:10:44 -07001653 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001654 try:
maruel1ceb3872015-10-14 06:10:44 -07001655 isolated_hash = fetch_queue.inject_local_file(path, algo)
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001656 except IOError as e:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001657 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001658 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001659 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001660
1661 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001662 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001663
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001664 with tools.Profiler('GetRest'):
1665 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001666 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001667 create_directories(outdir, bundle.files)
1668 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001669
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001670 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001671 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001672 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001673
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001674 # Multimap: digest -> list of pairs (path, props).
1675 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001676 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001677 if 'h' in props:
1678 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001679
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001680 # Now block on the remaining files to be downloaded and mapped.
1681 logging.info('Retrieving remaining files (%d of them)...',
1682 fetch_queue.pending_count)
1683 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001684 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001685 while remaining:
1686 detector.ping()
1687
1688 # Wait for any item to finish fetching to cache.
1689 digest = fetch_queue.wait(remaining)
1690
tansell9e04a8d2016-07-28 09:31:59 -07001691 # Create the files in the destination using item in cache as the
1692 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001693 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001694 fullpath = os.path.join(outdir, filepath)
1695
1696 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001697 filetype = props.get('t', 'basic')
1698
1699 if filetype == 'basic':
1700 file_mode = props.get('m')
1701 if file_mode:
1702 # Ignore all bits apart from the user
1703 file_mode &= 0700
1704 putfile(
1705 srcfileobj, fullpath, file_mode,
1706 use_symlink=use_symlinks)
1707
tansell26de79e2016-11-13 18:41:11 -08001708 elif filetype == 'tar':
1709 basedir = os.path.dirname(fullpath)
Marc-Antoine Ruelffd80132017-12-04 16:00:02 -05001710 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1711 for ti in t:
tansell26de79e2016-11-13 18:41:11 -08001712 if not ti.isfile():
1713 logging.warning(
1714 'Path(%r) is nonfile (%s), skipped',
1715 ti.name, ti.type)
1716 continue
1717 fp = os.path.normpath(os.path.join(basedir, ti.name))
1718 if not fp.startswith(basedir):
1719 logging.error(
1720 'Path(%r) is outside root directory',
1721 fp)
Marc-Antoine Ruelffd80132017-12-04 16:00:02 -05001722 ifd = t.extractfile(ti)
tansell26de79e2016-11-13 18:41:11 -08001723 file_path.ensure_tree(os.path.dirname(fp))
1724 putfile(ifd, fp, 0700, ti.size)
1725
tanselle4288c32016-07-28 09:45:40 -07001726 else:
1727 raise isolated_format.IsolatedError(
1728 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001729
1730 # Report progress.
1731 duration = time.time() - last_update
1732 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1733 msg = '%d files remaining...' % len(remaining)
1734 print msg
1735 logging.info(msg)
1736 last_update = time.time()
1737
1738 # Cache could evict some items we just tried to fetch, it's a fatal error.
1739 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001740 raise isolated_format.MappingError(
1741 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001742 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001743
1744
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001745def directory_to_metadata(root, algo, blacklist):
1746 """Returns the FileItem list and .isolated metadata for a directory."""
1747 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001748 paths = isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruel7a68f712017-12-01 18:45:18 -05001749 root, u'.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001750 metadata = {
1751 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001752 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001753 for relpath in paths
1754 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001755 for v in metadata.itervalues():
1756 v.pop('t')
1757 items = [
1758 FileItem(
1759 path=os.path.join(root, relpath),
1760 digest=meta['h'],
1761 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001762 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001763 for relpath, meta in metadata.iteritems() if 'h' in meta
1764 ]
1765 return items, metadata
1766
1767
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001768def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001769 """Stores every entries and returns the relevant data.
1770
1771 Arguments:
1772 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001773 files: list of file paths to upload. If a directory is specified, a
1774 .isolated file is created and its hash is returned.
1775 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001776
1777 Returns:
1778 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1779 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001780 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001781 assert all(isinstance(i, unicode) for i in files), files
1782 if len(files) != len(set(map(os.path.abspath, files))):
1783 raise Error('Duplicate entries found.')
1784
maruel064c0a32016-04-05 11:47:15 -07001785 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001786 results = []
1787 # The temporary directory is only created as needed.
1788 tempdir = None
1789 try:
1790 # TODO(maruel): Yield the files to a worker thread.
1791 items_to_upload = []
1792 for f in files:
1793 try:
1794 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001795 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001796 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001797 items, metadata = directory_to_metadata(
1798 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001799
1800 # Create the .isolated file.
1801 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001802 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1803 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001804 os.close(handle)
1805 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001806 'algo':
1807 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001808 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001809 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001810 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001811 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001812 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001813 items_to_upload.extend(items)
1814 items_to_upload.append(
1815 FileItem(
1816 path=isolated,
1817 digest=h,
maruel12e30012015-10-09 11:55:35 -07001818 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001819 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001820 results.append((h, f))
1821
maruel12e30012015-10-09 11:55:35 -07001822 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001823 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001824 items_to_upload.append(
1825 FileItem(
1826 path=filepath,
1827 digest=h,
maruel12e30012015-10-09 11:55:35 -07001828 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001829 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001830 results.append((h, f))
1831 else:
1832 raise Error('%s is neither a file or directory.' % f)
1833 except OSError:
1834 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001835 uploaded = storage.upload_items(items_to_upload)
1836 cold = [i for i in items_to_upload if i in uploaded]
1837 hot = [i for i in items_to_upload if i not in uploaded]
1838 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001839 finally:
maruel12e30012015-10-09 11:55:35 -07001840 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001841 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001842
1843
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001844def archive(out, namespace, files, blacklist):
1845 if files == ['-']:
1846 files = sys.stdin.readlines()
1847
1848 if not files:
1849 raise Error('Nothing to upload')
1850
1851 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001852 blacklist = tools.gen_blacklist(blacklist)
1853 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001854 # Ignore stats.
1855 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001856 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1857
1858
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001859@subcommand.usage('<file1..fileN> or - to read from stdin')
1860def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001861 """Archives data to the server.
1862
1863 If a directory is specified, a .isolated file is created the whole directory
1864 is uploaded. Then this .isolated file can be included in another one to run
1865 commands.
1866
1867 The commands output each file that was processed with its content hash. For
1868 directories, the .isolated generated for the directory is listed as the
1869 directory entry itself.
1870 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001871 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001872 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001873 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001874 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001875 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001876 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001877 except Error as e:
1878 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001879 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001880
1881
1882def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001883 """Download data from the server.
1884
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001885 It can either download individual files or a complete tree from a .isolated
1886 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001887 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001888 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001889 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001890 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001891 help='hash of an isolated file, .isolated file content is discarded, use '
1892 '--file if you need it')
1893 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001894 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1895 help='hash and destination of a file, can be used multiple times')
1896 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001897 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001898 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001899 parser.add_option(
1900 '--use-symlinks', action='store_true',
1901 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001902 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001903 options, args = parser.parse_args(args)
1904 if args:
1905 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001906 if not file_path.enable_symlink():
1907 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001908
nodir55be77b2016-05-03 09:39:57 -07001909 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001910 if bool(options.isolated) == bool(options.file):
1911 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001912 if not options.cache and options.use_symlinks:
1913 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001914
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001915 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001916 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001917 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001918 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001919 if (fs.isfile(options.target) or
1920 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001921 parser.error(
1922 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001923 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001924 # Fetching individual files.
1925 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001926 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001927 channel = threading_utils.TaskChannel()
1928 pending = {}
1929 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001930 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001931 pending[digest] = dest
1932 storage.async_fetch(
1933 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001934 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001935 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001936 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001937 functools.partial(file_write, os.path.join(options.target, dest)))
1938 while pending:
1939 fetched = channel.pull()
1940 dest = pending.pop(fetched)
1941 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001942
Vadim Shtayura3172be52013-12-03 12:49:05 -08001943 # Fetching whole isolated tree.
1944 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001945 with cache:
1946 bundle = fetch_isolated(
1947 isolated_hash=options.isolated,
1948 storage=storage,
1949 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001950 outdir=options.target,
1951 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001952 if bundle.command:
1953 rel = os.path.join(options.target, bundle.relative_cwd)
1954 print('To run this test please run from the directory %s:' %
1955 os.path.join(options.target, rel))
1956 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001957
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001958 return 0
1959
1960
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001961def add_archive_options(parser):
1962 parser.add_option(
1963 '--blacklist',
1964 action='append', default=list(DEFAULT_BLACKLIST),
1965 help='List of regexp to use as blacklist filter when uploading '
1966 'directories')
1967
1968
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001969def add_isolate_server_options(parser):
1970 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001971 parser.add_option(
1972 '-I', '--isolate-server',
1973 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001974 help='URL of the Isolate Server to use. Defaults to the environment '
1975 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1976 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001977 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001978 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001979 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001980 '--namespace', default='default-gzip',
1981 help='The namespace to use on the Isolate Server, default: %default')
1982
1983
nodir55be77b2016-05-03 09:39:57 -07001984def process_isolate_server_options(
1985 parser, options, set_exception_handler, required):
1986 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001987
1988 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001989 """
1990 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001991 if required:
1992 parser.error('--isolate-server is required.')
1993 return
1994
aludwind7b7b7e2017-06-29 16:38:50 -07001995 if options.grpc_proxy:
1996 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001997 else:
1998 try:
1999 options.isolate_server = net.fix_url(options.isolate_server)
2000 except ValueError as e:
2001 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002002 if set_exception_handler:
2003 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002004 try:
2005 return auth.ensure_logged_in(options.isolate_server)
2006 except ValueError as e:
2007 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002008
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002009
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002010def add_cache_options(parser):
2011 cache_group = optparse.OptionGroup(parser, 'Cache management')
2012 cache_group.add_option(
2013 '--cache', metavar='DIR',
2014 help='Directory to keep a local cache of the files. Accelerates download '
2015 'by reusing already downloaded files. Default=%default')
2016 cache_group.add_option(
2017 '--max-cache-size',
2018 type='int',
2019 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002020 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002021 help='Trim if the cache gets larger than this value, default=%default')
2022 cache_group.add_option(
2023 '--min-free-space',
2024 type='int',
2025 metavar='NNN',
2026 default=2*1024*1024*1024,
2027 help='Trim if disk free space becomes lower than this value, '
2028 'default=%default')
2029 cache_group.add_option(
2030 '--max-items',
2031 type='int',
2032 metavar='NNN',
2033 default=100000,
2034 help='Trim if more than this number of items are in the cache '
2035 'default=%default')
2036 parser.add_option_group(cache_group)
2037
2038
maruele6fc9382017-05-04 09:03:48 -07002039def process_cache_options(options, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002040 if options.cache:
2041 policies = CachePolicies(
2042 options.max_cache_size, options.min_free_space, options.max_items)
2043
2044 # |options.cache| path may not exist until DiskCache() instance is created.
2045 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002046 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002047 policies,
nodirf33b8d62016-10-26 22:34:58 -07002048 isolated_format.get_hash_algo(options.namespace),
maruele6fc9382017-05-04 09:03:48 -07002049 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002050 else:
2051 return MemoryCache()
2052
2053
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002054class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002055 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002056 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002057 self,
2058 version=__version__,
2059 prog=os.path.basename(sys.modules[__name__].__file__),
2060 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002061 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002062
2063 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002064 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002065 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002066 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002067 return options, args
2068
2069
2070def main(args):
2071 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002072 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002073
2074
2075if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002076 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002077 fix_encoding.fix_encoding()
2078 tools.disable_buffering()
2079 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002080 sys.exit(main(sys.argv[1:]))