blob: 820ab40879021a226e9483bf4580afc199b00aad [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04008__version__ = '0.8.1'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
maruel12e30012015-10-09 11:55:35 -070030from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040031from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040032from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000033from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040034from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070035from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
aludwin81178302016-11-30 17:18:49 -080041import isolate_storage
42from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080043
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000044
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045# Version of isolate protocol passed to the server in /handshake request.
46ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048
Vadim Shtayura3148e072014-09-02 18:51:52 -070049# The file size to be used when we don't know the correct file size,
50# generally used for .isolated files.
51UNKNOWN_FILE_SIZE = None
52
53
54# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050088DEFAULT_BLACKLIST = (
89 # Temporary vim or python files.
90 r'^.+\.(?:pyc|swp)$',
91 # .git or .svn directory.
92 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
93)
94
95
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050096class Error(Exception):
97 """Generic runtime error."""
98 pass
99
100
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400101class Aborted(Error):
102 """Operation aborted."""
103 pass
104
105
nodir90bc8dc2016-06-15 13:35:21 -0700106class AlreadyExists(Error):
107 """File already exists."""
108
109
maruel12e30012015-10-09 11:55:35 -0700110def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700112 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800113 if offset:
114 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000116 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 if not data:
118 break
119 yield data
120
121
maruel12e30012015-10-09 11:55:35 -0700122def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000123 """Writes file content as generated by content_generator.
124
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 Creates the intermediary directory as needed.
126
127 Returns the number of bytes written.
128
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000129 Meant to be mocked out in unit tests.
130 """
nodire5028a92016-04-29 14:38:21 -0700131 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 total = 0
maruel12e30012015-10-09 11:55:35 -0700133 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000134 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000135 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000137 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138
139
tansell9e04a8d2016-07-28 09:31:59 -0700140def fileobj_path(fileobj):
141 """Return file system path for file like object or None.
142
143 The returned path is guaranteed to exist and can be passed to file system
144 operations like copy.
145 """
146 name = getattr(fileobj, 'name', None)
147 if name is None:
148 return
149
150 # If the file like object was created using something like open("test.txt")
151 # name will end up being a str (such as a function outside our control, like
152 # the standard library). We want all our paths to be unicode objects, so we
153 # decode it.
154 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500155 # We incorrectly assume that UTF-8 is used everywhere.
156 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700157
tansell26de79e2016-11-13 18:41:11 -0800158 # fs.exists requires an absolute path, otherwise it will fail with an
159 # assertion error.
160 if not os.path.isabs(name):
161 return
162
tansell9e04a8d2016-07-28 09:31:59 -0700163 if fs.exists(name):
164 return name
165
166
167# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
168# wrappers have been created.
169def fileobj_copy(
170 dstfileobj, srcfileobj, size=-1,
171 chunk_size=isolated_format.DISK_FILE_CHUNK):
172 """Copy data from srcfileobj to dstfileobj.
173
174 Providing size means exactly that amount of data will be copied (if there
175 isn't enough data, an IOError exception is thrown). Otherwise all data until
176 the EOF marker will be copied.
177 """
178 if size == -1 and hasattr(srcfileobj, 'tell'):
179 if srcfileobj.tell() != 0:
180 raise IOError('partial file but not using size')
181
182 written = 0
183 while written != size:
184 readsize = chunk_size
185 if size > 0:
186 readsize = min(readsize, size-written)
187 data = srcfileobj.read(readsize)
188 if not data:
189 if size == -1:
190 break
191 raise IOError('partial file, got %s, wanted %s' % (written, size))
192 dstfileobj.write(data)
193 written += len(data)
194
195
196def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
197 """Put srcfileobj at the given dstpath with given mode.
198
199 The function aims to do this as efficiently as possible while still allowing
200 any possible file like object be given.
201
202 Creating a tree of hardlinks has a few drawbacks:
203 - tmpfs cannot be used for the scratch space. The tree has to be on the same
204 partition as the cache.
205 - involves a write to the inode, which advances ctime, cause a metadata
206 writeback (causing disk seeking).
207 - cache ctime cannot be used to detect modifications / corruption.
208 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
209 partition. This is why the function automatically fallbacks to copying the
210 file content.
211 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
212 same owner is for all hardlinks.
213 - Anecdotal report that ext2 is known to be potentially faulty on high rate
214 of hardlink creation.
215
216 Creating a tree of symlinks has a few drawbacks:
217 - Tasks running the equivalent of os.path.realpath() will get the naked path
218 and may fail.
219 - Windows:
220 - Symlinks are reparse points:
221 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
222 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
223 - Symbolic links are Win32 paths, not NT paths.
224 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
225 - Symbolic links are supported on Windows 7 and later only.
226 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
227 default.
228 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
229 RID is present in the token;
230 https://msdn.microsoft.com/en-us/library/bb530410.aspx
231 """
232 srcpath = fileobj_path(srcfileobj)
233 if srcpath and size == -1:
234 readonly = file_mode is None or (
235 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
236
237 if readonly:
238 # If the file is read only we can link the file
239 if use_symlink:
240 link_mode = file_path.SYMLINK_WITH_FALLBACK
241 else:
242 link_mode = file_path.HARDLINK_WITH_FALLBACK
243 else:
244 # If not read only, we must copy the file
245 link_mode = file_path.COPY
246
247 file_path.link_file(dstpath, srcpath, link_mode)
248 else:
249 # Need to write out the file
250 with fs.open(dstpath, 'wb') as dstfileobj:
251 fileobj_copy(dstfileobj, srcfileobj, size)
252
253 assert fs.exists(dstpath)
254
255 # file_mode of 0 is actually valid, so need explicit check.
256 if file_mode is not None:
257 fs.chmod(dstpath, file_mode)
258
259
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000260def zip_compress(content_generator, level=7):
261 """Reads chunks from |content_generator| and yields zip compressed chunks."""
262 compressor = zlib.compressobj(level)
263 for chunk in content_generator:
264 compressed = compressor.compress(chunk)
265 if compressed:
266 yield compressed
267 tail = compressor.flush(zlib.Z_FINISH)
268 if tail:
269 yield tail
270
271
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400272def zip_decompress(
273 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000274 """Reads zipped data from |content_generator| and yields decompressed data.
275
276 Decompresses data in small chunks (no larger than |chunk_size|) so that
277 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
278
279 Raises IOError if data is corrupted or incomplete.
280 """
281 decompressor = zlib.decompressobj()
282 compressed_size = 0
283 try:
284 for chunk in content_generator:
285 compressed_size += len(chunk)
286 data = decompressor.decompress(chunk, chunk_size)
287 if data:
288 yield data
289 while decompressor.unconsumed_tail:
290 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
291 if data:
292 yield data
293 tail = decompressor.flush()
294 if tail:
295 yield tail
296 except zlib.error as e:
297 raise IOError(
298 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
299 # Ensure all data was read and decompressed.
300 if decompressor.unused_data or decompressor.unconsumed_tail:
301 raise IOError('Not all data was decompressed')
302
303
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000304def get_zip_compression_level(filename):
305 """Given a filename calculates the ideal zip compression level to use."""
306 file_ext = os.path.splitext(filename)[1].lower()
307 # TODO(csharp): Profile to find what compression level works best.
308 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
309
310
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000311def create_directories(base_directory, files):
312 """Creates the directory structure needed by the given list of files."""
313 logging.debug('create_directories(%s, %d)', base_directory, len(files))
314 # Creates the tree of directories to create.
315 directories = set(os.path.dirname(f) for f in files)
316 for item in list(directories):
317 while item:
318 directories.add(item)
319 item = os.path.dirname(item)
320 for d in sorted(directories):
321 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700322 abs_d = os.path.join(base_directory, d)
323 if not fs.isdir(abs_d):
324 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000325
326
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500327def create_symlinks(base_directory, files):
328 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000329 for filepath, properties in files:
330 if 'l' not in properties:
331 continue
332 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500333 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000334 logging.warning('Ignoring symlink %s', filepath)
335 continue
336 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700337 try:
338 os.symlink(properties['l'], outfile) # pylint: disable=E1101
339 except OSError as e:
340 if e.errno == errno.EEXIST:
341 raise AlreadyExists('File %s already exists.' % outfile)
342 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000343
344
maruel12e30012015-10-09 11:55:35 -0700345def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000346 """Determines if the given files appears valid.
347
vadimsh129e5942017-01-04 16:42:46 -0800348 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000349 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700350 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700351 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800352 try:
353 actual_size = fs.stat(path).st_size
354 except OSError as e:
355 logging.warning(
356 'Can\'t read item %s, assuming it\'s invalid: %s',
357 os.path.basename(path), e)
358 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000359 if size != actual_size:
360 logging.warning(
361 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700362 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000363 return False
364 return True
365
366
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000367class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800368 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000369
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 Its digest and size may be provided in advance, if known. Otherwise they will
371 be derived from the file content.
372 """
373
374 def __init__(self, path, digest=None, size=None, high_priority=False):
375 super(FileItem, self).__init__(
376 digest,
maruel12e30012015-10-09 11:55:35 -0700377 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800378 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000379 self.path = path
380 self.compression_level = get_zip_compression_level(path)
381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000386class BufferItem(Item):
387 """A byte buffer to push to Storage."""
388
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800389 def __init__(self, buf, high_priority=False):
390 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000391 self.buffer = buf
392
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800393 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000394 return [self.buffer]
395
396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800398 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000399
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800400 Implements compression support, parallel 'contains' checks, parallel uploads
401 and more.
402
403 Works only within single namespace (and thus hashing algorithm and compression
404 scheme are fixed).
405
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400406 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
407 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800408 """
409
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700410 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000411 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400412 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700413 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400414 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 self._cpu_thread_pool = None
416 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 self._aborted = False
418 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000419
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700421 def hash_algo(self):
422 """Hashing algorithm used to name files in storage based on their content.
423
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400424 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700425 """
426 return self._hash_algo
427
428 @property
429 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500430 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700431 return self._storage_api.location
432
433 @property
434 def namespace(self):
435 """Isolate namespace used by this storage.
436
437 Indirectly defines hashing scheme and compression method used.
438 """
439 return self._storage_api.namespace
440
441 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000442 def cpu_thread_pool(self):
443 """ThreadPool for CPU-bound tasks like zipping."""
444 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500445 threads = max(threading_utils.num_processors(), 2)
446 if sys.maxsize <= 2L**32:
447 # On 32 bits userland, do not try to use more than 16 threads.
448 threads = min(threads, 16)
449 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000450 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 @property
453 def net_thread_pool(self):
454 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
455 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700456 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 def close(self):
460 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400461 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 if self._cpu_thread_pool:
463 self._cpu_thread_pool.join()
464 self._cpu_thread_pool.close()
465 self._cpu_thread_pool = None
466 if self._net_thread_pool:
467 self._net_thread_pool.join()
468 self._net_thread_pool.close()
469 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400470 logging.info('Done.')
471
472 def abort(self):
473 """Cancels any pending or future operations."""
474 # This is not strictly theadsafe, but in the worst case the logging message
475 # will be printed twice. Not a big deal. In other places it is assumed that
476 # unprotected reads and writes to _aborted are serializable (it is true
477 # for python) and thus no locking is used.
478 if not self._aborted:
479 logging.warning('Aborting... It can take a while.')
480 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000481
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 def __enter__(self):
483 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400484 assert not self._prev_sig_handlers, self._prev_sig_handlers
485 for s in (signal.SIGINT, signal.SIGTERM):
486 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 return self
488
489 def __exit__(self, _exc_type, _exc_value, _traceback):
490 """Context manager interface."""
491 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400492 while self._prev_sig_handlers:
493 s, h = self._prev_sig_handlers.popitem()
494 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 return False
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
503 items: list of Item instances that represents data to upload.
504
505 Returns:
506 List of items that were uploaded. All other items are already there.
507 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700508 logging.info('upload_items(items=%d)', len(items))
509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 # Ensure all digests are calculated.
511 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700512 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000514 # For each digest keep only first Item that matches it. All other items
515 # are just indistinguishable copies from the point of view of isolate
516 # server (it doesn't care about paths at all, only content and digests).
517 seen = {}
518 duplicates = 0
519 for item in items:
520 if seen.setdefault(item.digest, item) is not item:
521 duplicates += 1
522 items = seen.values()
523 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700524 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000525
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 channel = threading_utils.TaskChannel()
530 for missing_item, push_state in self.get_missing_items(items):
531 missing.add(missing_item)
532 self.async_push(channel, missing_item, push_state)
533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 # No need to spawn deadlock detector thread if there's nothing to upload.
535 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700536 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 detector.ping()
540 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 logging.info('All files are uploaded')
545
546 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 total = len(items)
548 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 logging.info(
550 'Total: %6d, %9.1fkb',
551 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000552 total_size / 1024.)
553 cache_hit = set(items) - missing
554 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 logging.info(
556 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
557 len(cache_hit),
558 cache_hit_size / 1024.,
559 len(cache_hit) * 100. / total,
560 cache_hit_size * 100. / total_size if total_size else 0)
561 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 logging.info(
564 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
565 len(cache_miss),
566 cache_miss_size / 1024.,
567 len(cache_miss) * 100. / total,
568 cache_miss_size * 100. / total_size if total_size else 0)
569
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 return uploaded
571
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000573 """Starts asynchronous push to the server in a parallel thread.
574
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 Can be used only after |item| was checked for presence on a server with
576 'get_missing_items' call. 'get_missing_items' returns |push_state| object
577 that contains storage specific information describing how to upload
578 the item (for example in case of cloud storage, it is signed upload URLs).
579
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000581 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000582 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800583 push_state: push state returned by 'get_missing_items' call for |item|.
584
585 Returns:
586 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800588 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400589 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700590 threading_utils.PRIORITY_HIGH if item.high_priority
591 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800592
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000593 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400594 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400595 if self._aborted:
596 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800598 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 return item
600
Wei Huang1a38fbe2017-11-28 22:55:22 -0500601 # If zipping is not required, just start a push task. Don't pass 'content'
602 # so that it can create a new generator when it retries on failures.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700603 if not self._use_zip:
Wei Huang1a38fbe2017-11-28 22:55:22 -0500604 self.net_thread_pool.add_task_with_channel(channel, priority, push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 return
606
607 # If zipping is enabled, zip in a separate thread.
608 def zip_and_push():
609 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
610 # content right here. It will block until all file is zipped.
611 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400612 if self._aborted:
613 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 data = ''.join(stream)
616 except Exception as exc:
617 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800618 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500620 # Pass '[data]' explicitly because the compressed data is not same as the
621 # one provided by 'item'. Since '[data]' is a list, it can safely be
622 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000623 self.net_thread_pool.add_task_with_channel(
624 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000625 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000626
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800627 def push(self, item, push_state):
628 """Synchronously pushes a single item to the server.
629
630 If you need to push many items at once, consider using 'upload_items' or
631 'async_push' with instance of TaskChannel.
632
633 Arguments:
634 item: item to upload as instance of Item class.
635 push_state: push state returned by 'get_missing_items' call for |item|.
636
637 Returns:
638 Pushed item (same object as |item|).
639 """
640 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700641 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642 self.async_push(channel, item, push_state)
643 pushed = channel.pull()
644 assert pushed is item
645 return item
646
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000647 def async_fetch(self, channel, priority, digest, size, sink):
648 """Starts asynchronous fetch from the server in a parallel thread.
649
650 Arguments:
651 channel: TaskChannel that receives back |digest| when download ends.
652 priority: thread pool task priority for the fetch.
653 digest: hex digest of an item to download.
654 size: expected size of the item (after decompression).
655 sink: function that will be called as sink(generator).
656 """
657 def fetch():
658 try:
659 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400660 stream = self._storage_api.fetch(digest, size, 0)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700661 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400662 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000663 # Run |stream| through verifier that will assert its size.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400664 verifier = FetchStreamVerifier(stream, self._hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000665 # Verified stream goes to |sink|.
666 sink(verifier.run())
667 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800668 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000669 raise
670 return digest
671
672 # Don't bother with zip_thread_pool for decompression. Decompression is
673 # really fast and most probably IO bound anyway.
674 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
675
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000676 def get_missing_items(self, items):
677 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000678
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000679 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000682 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000683
684 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800685 For each missing item it yields a pair (item, push_state), where:
686 * item - Item object that is missing (one of |items|).
687 * push_state - opaque object that contains storage specific information
688 describing how to upload the item (for example in case of cloud
689 storage, it is signed upload URLs). It can later be passed to
690 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000691 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000692 channel = threading_utils.TaskChannel()
693 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800694
695 # Ensure all digests are calculated.
696 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700697 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800698
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400699 def contains(batch):
700 if self._aborted:
701 raise Aborted()
702 return self._storage_api.contains(batch)
703
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000704 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800705 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400706 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400707 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000708 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800709
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000710 # Yield results as they come in.
711 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800712 for missing_item, push_state in channel.pull().iteritems():
713 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000714
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000715
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800716def batch_items_for_check(items):
717 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000718
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800719 Each batch corresponds to a single 'exists?' query to the server via a call
720 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000721
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800722 Arguments:
723 items: a list of Item objects.
724
725 Yields:
726 Batches of items to query for existence in a single operation,
727 each batch is a list of Item objects.
728 """
729 batch_count = 0
730 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
731 next_queries = []
732 for item in sorted(items, key=lambda x: x.size, reverse=True):
733 next_queries.append(item)
734 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000735 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800736 next_queries = []
737 batch_count += 1
738 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
739 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
740 if next_queries:
741 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000742
743
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000744class FetchQueue(object):
745 """Fetches items from Storage and places them into LocalCache.
746
747 It manages multiple concurrent fetch operations. Acts as a bridge between
748 Storage and LocalCache so that Storage and LocalCache don't depend on each
749 other at all.
750 """
751
752 def __init__(self, storage, cache):
753 self.storage = storage
754 self.cache = cache
755 self._channel = threading_utils.TaskChannel()
756 self._pending = set()
757 self._accessed = set()
758 self._fetched = cache.cached_set()
759
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400760 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700761 self,
762 digest,
763 size=UNKNOWN_FILE_SIZE,
764 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000765 """Starts asynchronous fetch of item |digest|."""
766 # Fetching it now?
767 if digest in self._pending:
768 return
769
770 # Mark this file as in use, verify_all_cached will later ensure it is still
771 # in cache.
772 self._accessed.add(digest)
773
774 # Already fetched? Notify cache to update item's LRU position.
775 if digest in self._fetched:
776 # 'touch' returns True if item is in cache and not corrupted.
777 if self.cache.touch(digest, size):
778 return
779 # Item is corrupted, remove it from cache and fetch it again.
780 self._fetched.remove(digest)
781 self.cache.evict(digest)
782
783 # TODO(maruel): It should look at the free disk space, the current cache
784 # size and the size of the new item on every new item:
785 # - Trim the cache as more entries are listed when free disk space is low,
786 # otherwise if the amount of data downloaded during the run > free disk
787 # space, it'll crash.
788 # - Make sure there's enough free disk space to fit all dependencies of
789 # this run! If not, abort early.
790
791 # Start fetching.
792 self._pending.add(digest)
793 self.storage.async_fetch(
794 self._channel, priority, digest, size,
795 functools.partial(self.cache.write, digest))
796
797 def wait(self, digests):
798 """Starts a loop that waits for at least one of |digests| to be retrieved.
799
800 Returns the first digest retrieved.
801 """
802 # Flush any already fetched items.
803 for digest in digests:
804 if digest in self._fetched:
805 return digest
806
807 # Ensure all requested items are being fetched now.
808 assert all(digest in self._pending for digest in digests), (
809 digests, self._pending)
810
811 # Wait for some requested item to finish fetching.
812 while self._pending:
813 digest = self._channel.pull()
814 self._pending.remove(digest)
815 self._fetched.add(digest)
816 if digest in digests:
817 return digest
818
819 # Should never reach this point due to assert above.
820 raise RuntimeError('Impossible state')
821
822 def inject_local_file(self, path, algo):
823 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700824 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000825 data = f.read()
826 digest = algo(data).hexdigest()
827 self.cache.write(digest, [data])
828 self._fetched.add(digest)
829 return digest
830
831 @property
832 def pending_count(self):
833 """Returns number of items to be fetched."""
834 return len(self._pending)
835
836 def verify_all_cached(self):
837 """True if all accessed items are in cache."""
838 return self._accessed.issubset(self.cache.cached_set())
839
840
841class FetchStreamVerifier(object):
842 """Verifies that fetched file is valid before passing it to the LocalCache."""
843
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400844 def __init__(self, stream, hasher, expected_digest, expected_size):
845 """Initializes the verifier.
846
847 Arguments:
848 * stream: an iterable yielding chunks of content
849 * hasher: an object from hashlib that supports update() and hexdigest()
850 (eg, hashlib.sha1).
851 * expected_digest: if the entire stream is piped through hasher and then
852 summarized via hexdigest(), this should be the result. That is, it
853 should be a hex string like 'abc123'.
854 * expected_size: either the expected size of the stream, or
855 UNKNOWN_FILE_SIZE.
856 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400857 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000858 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400859 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000860 self.expected_size = expected_size
861 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400862 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000863
864 def run(self):
865 """Generator that yields same items as |stream|.
866
867 Verifies |stream| is complete before yielding a last chunk to consumer.
868
869 Also wraps IOError produced by consumer into MappingError exceptions since
870 otherwise Storage will retry fetch on unrelated local cache errors.
871 """
872 # Read one chunk ahead, keep it in |stored|.
873 # That way a complete stream can be verified before pushing last chunk
874 # to consumer.
875 stored = None
876 for chunk in self.stream:
877 assert chunk is not None
878 if stored is not None:
879 self._inspect_chunk(stored, is_last=False)
880 try:
881 yield stored
882 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400883 raise isolated_format.MappingError(
884 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000885 stored = chunk
886 if stored is not None:
887 self._inspect_chunk(stored, is_last=True)
888 try:
889 yield stored
890 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400891 raise isolated_format.MappingError(
892 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000893
894 def _inspect_chunk(self, chunk, is_last):
895 """Called for each fetched chunk before passing it to consumer."""
896 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400897 self.rolling_hash.update(chunk)
898 if not is_last:
899 return
900
901 if ((self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000902 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400903 msg = 'Incorrect file size: want %d, got %d' % (
904 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400905 raise IOError(msg)
906
907 actual_digest = self.rolling_hash.hexdigest()
908 if self.expected_digest != actual_digest:
909 msg = 'Incorrect digest: want %s, got %s' % (
910 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400911 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000912
913
nodir445097b2016-06-03 22:50:26 -0700914class CacheMiss(Exception):
915 """Raised when an item is not in cache."""
916
917 def __init__(self, digest):
918 self.digest = digest
919 super(CacheMiss, self).__init__(
920 'Item with digest %r is not found in cache' % digest)
921
922
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000923class LocalCache(object):
924 """Local cache that stores objects fetched via Storage.
925
926 It can be accessed concurrently from multiple threads, so it should protect
927 its internal state with some lock.
928 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500929 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000930
maruel064c0a32016-04-05 11:47:15 -0700931 def __init__(self):
932 self._lock = threading_utils.LockWithAssert()
933 # Profiling values.
934 self._added = []
935 self._initial_number_items = 0
936 self._initial_size = 0
937 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700938 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700939
nodirbe642ff2016-06-09 15:51:51 -0700940 def __contains__(self, digest):
941 raise NotImplementedError()
942
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000943 def __enter__(self):
944 """Context manager interface."""
945 return self
946
947 def __exit__(self, _exc_type, _exec_value, _traceback):
948 """Context manager interface."""
949 return False
950
maruel064c0a32016-04-05 11:47:15 -0700951 @property
952 def added(self):
953 return self._added[:]
954
955 @property
956 def evicted(self):
957 return self._evicted[:]
958
959 @property
tansell9e04a8d2016-07-28 09:31:59 -0700960 def used(self):
961 return self._used[:]
962
963 @property
maruel064c0a32016-04-05 11:47:15 -0700964 def initial_number_items(self):
965 return self._initial_number_items
966
967 @property
968 def initial_size(self):
969 return self._initial_size
970
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -0500971 @property
972 def number_items(self):
973 """Returns the total size of the cache in bytes."""
974 raise NotImplementedError()
975
976 @property
977 def total_size(self):
978 """Returns the total size of the cache in bytes."""
979 raise NotImplementedError()
980
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000981 def cached_set(self):
982 """Returns a set of all cached digests (always a new object)."""
983 raise NotImplementedError()
984
maruel36a963d2016-04-08 17:15:49 -0700985 def cleanup(self):
986 """Deletes any corrupted item from the cache and trims it if necessary."""
987 raise NotImplementedError()
988
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000989 def touch(self, digest, size):
990 """Ensures item is not corrupted and updates its LRU position.
991
992 Arguments:
993 digest: hash digest of item to check.
994 size: expected size of this item.
995
996 Returns:
997 True if item is in cache and not corrupted.
998 """
999 raise NotImplementedError()
1000
1001 def evict(self, digest):
1002 """Removes item from cache if it's there."""
1003 raise NotImplementedError()
1004
tansell9e04a8d2016-07-28 09:31:59 -07001005 def getfileobj(self, digest):
1006 """Returns a readable file like object.
1007
1008 If file exists on the file system it will have a .name attribute with an
1009 absolute path to the file.
1010 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001011 raise NotImplementedError()
1012
1013 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001014 """Reads data from |content| generator and stores it in cache.
1015
1016 Returns digest to simplify chaining.
1017 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001018 raise NotImplementedError()
1019
maruele6fc9382017-05-04 09:03:48 -07001020 def trim(self):
1021 """Enforces cache policies.
1022
1023 Returns:
1024 Number of items evicted.
1025 """
1026 raise NotImplementedError()
1027
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001028
1029class MemoryCache(LocalCache):
1030 """LocalCache implementation that stores everything in memory."""
1031
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001032 def __init__(self, file_mode_mask=0500):
1033 """Args:
1034 file_mode_mask: bit mask to AND file mode with. Default value will make
1035 all mapped files to be read only.
1036 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001037 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001038 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001039 self._contents = {}
1040
nodirbe642ff2016-06-09 15:51:51 -07001041 def __contains__(self, digest):
1042 with self._lock:
1043 return digest in self._contents
1044
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001045 @property
1046 def number_items(self):
1047 with self._lock:
1048 return len(self._contents)
1049
1050 @property
1051 def total_size(self):
1052 with self._lock:
1053 return sum(len(i) for i in self._contents.itervalues())
1054
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001055 def cached_set(self):
1056 with self._lock:
1057 return set(self._contents)
1058
maruel36a963d2016-04-08 17:15:49 -07001059 def cleanup(self):
1060 pass
1061
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001062 def touch(self, digest, size):
1063 with self._lock:
1064 return digest in self._contents
1065
1066 def evict(self, digest):
1067 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001068 v = self._contents.pop(digest, None)
1069 if v is not None:
1070 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001071
tansell9e04a8d2016-07-28 09:31:59 -07001072 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001073 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001074 try:
tansell9e04a8d2016-07-28 09:31:59 -07001075 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001076 except KeyError:
1077 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001078 self._used.append(len(d))
1079 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001080
1081 def write(self, digest, content):
1082 # Assemble whole stream before taking the lock.
1083 data = ''.join(content)
1084 with self._lock:
1085 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001086 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001087 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001088
maruele6fc9382017-05-04 09:03:48 -07001089 def trim(self):
1090 """Trimming is not implemented for MemoryCache."""
1091 return 0
1092
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001093
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001094class CachePolicies(object):
1095 def __init__(self, max_cache_size, min_free_space, max_items):
1096 """
1097 Arguments:
1098 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1099 cache is effectively a leak.
1100 - min_free_space: Trim if disk free space becomes lower than this value. If
1101 0, it unconditionally fill the disk.
1102 - max_items: Maximum number of items to keep in the cache. If 0, do not
1103 enforce a limit.
1104 """
1105 self.max_cache_size = max_cache_size
1106 self.min_free_space = min_free_space
1107 self.max_items = max_items
1108
Marc-Antoine Ruel211a9552018-01-17 16:47:29 -05001109 def __str__(self):
1110 return (
1111 'CachePolicies(cache=%dbytes, %d items; min_free_space=%d)') % (
1112 self.max_cache_size, self.max_items, self.min_free_space)
1113
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001114
1115class DiskCache(LocalCache):
1116 """Stateful LRU cache in a flat hash table in a directory.
1117
1118 Saves its state as json file.
1119 """
maruel12e30012015-10-09 11:55:35 -07001120 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001121
maruele6fc9382017-05-04 09:03:48 -07001122 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001123 """
1124 Arguments:
1125 cache_dir: directory where to place the cache.
1126 policies: cache retention policies.
1127 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001128 trim: if True to enforce |policies| right away.
1129 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001130 """
maruel064c0a32016-04-05 11:47:15 -07001131 # All protected methods (starting with '_') except _path should be called
1132 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001133 super(DiskCache, self).__init__()
1134 self.cache_dir = cache_dir
1135 self.policies = policies
1136 self.hash_algo = hash_algo
1137 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001138 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001139 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001140 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001141 file_path.ensure_tree(self.cache_dir)
1142 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001143 # The first item in the LRU cache that must not be evicted during this run
1144 # since it was referenced. All items more recent that _protected in the LRU
1145 # cache are also inherently protected. It could be a set() of all items
1146 # referenced but this increases memory usage without a use case.
1147 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001148 # Cleanup operations done by self._load(), if any.
1149 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001150 with tools.Profiler('Setup'):
1151 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001152 self._load(trim, time_fn)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001153
nodirbe642ff2016-06-09 15:51:51 -07001154 def __contains__(self, digest):
1155 with self._lock:
1156 return digest in self._lru
1157
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001158 def __enter__(self):
1159 return self
1160
1161 def __exit__(self, _exc_type, _exec_value, _traceback):
1162 with tools.Profiler('CleanupTrimming'):
1163 with self._lock:
1164 self._trim()
1165
1166 logging.info(
1167 '%5d (%8dkb) added',
1168 len(self._added), sum(self._added) / 1024)
1169 logging.info(
1170 '%5d (%8dkb) current',
1171 len(self._lru),
1172 sum(self._lru.itervalues()) / 1024)
1173 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001174 '%5d (%8dkb) evicted',
1175 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001176 logging.info(
1177 ' %8dkb free',
1178 self._free_disk / 1024)
1179 return False
1180
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001181 @property
1182 def number_items(self):
1183 with self._lock:
1184 return len(self._lru)
1185
1186 @property
1187 def total_size(self):
1188 with self._lock:
1189 return sum(self._lru.itervalues())
1190
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001191 def cached_set(self):
1192 with self._lock:
1193 return self._lru.keys_set()
1194
maruel36a963d2016-04-08 17:15:49 -07001195 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001196 """Cleans up the cache directory.
1197
1198 Ensures there is no unknown files in cache_dir.
1199 Ensures the read-only bits are set correctly.
1200
1201 At that point, the cache was already loaded, trimmed to respect cache
1202 policies.
1203 """
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001204 with self._lock:
1205 fs.chmod(self.cache_dir, 0700)
1206 # Ensure that all files listed in the state still exist and add new ones.
1207 previous = self._lru.keys_set()
1208 # It'd be faster if there were a readdir() function.
1209 for filename in fs.listdir(self.cache_dir):
1210 if filename == self.STATE_FILE:
1211 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1212 continue
1213 if filename in previous:
1214 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1215 previous.remove(filename)
1216 continue
1217
1218 # An untracked file. Delete it.
1219 logging.warning('Removing unknown file %s from cache', filename)
1220 p = self._path(filename)
1221 if fs.isdir(p):
1222 try:
1223 file_path.rmtree(p)
1224 except OSError:
1225 pass
1226 else:
1227 file_path.try_remove(p)
maruel2e8d0f52016-07-16 07:51:29 -07001228 continue
1229
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001230 if previous:
1231 # Filter out entries that were not found.
1232 logging.warning('Removed %d lost files', len(previous))
1233 for filename in previous:
1234 self._lru.pop(filename)
1235 self._save()
maruel36a963d2016-04-08 17:15:49 -07001236
1237 # What remains to be done is to hash every single item to
1238 # detect corruption, then save to ensure state.json is up to date.
1239 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1240 # TODO(maruel): Let's revisit once directory metadata is stored in
1241 # state.json so only the files that had been mapped since the last cleanup()
1242 # call are manually verified.
1243 #
1244 #with self._lock:
1245 # for digest in self._lru:
1246 # if not isolated_format.is_valid_hash(
1247 # self._path(digest), self.hash_algo):
1248 # self.evict(digest)
1249 # logging.info('Deleted corrupted item: %s', digest)
1250
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001251 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001252 """Verifies an actual file is valid and bumps its LRU position.
1253
1254 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1255 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001256
1257 Note that is doesn't compute the hash so it could still be corrupted if the
1258 file size didn't change.
1259
1260 TODO(maruel): More stringent verification while keeping the check fast.
1261 """
1262 # Do the check outside the lock.
1263 if not is_valid_file(self._path(digest), size):
1264 return False
1265
1266 # Update it's LRU position.
1267 with self._lock:
1268 if digest not in self._lru:
1269 return False
1270 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001271 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001272 return True
1273
1274 def evict(self, digest):
1275 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001276 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001277 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001278 self._lru.pop(digest)
1279 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1280
tansell9e04a8d2016-07-28 09:31:59 -07001281 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001282 try:
tansell9e04a8d2016-07-28 09:31:59 -07001283 f = fs.open(self._path(digest), 'rb')
1284 with self._lock:
1285 self._used.append(self._lru[digest])
1286 return f
nodir445097b2016-06-03 22:50:26 -07001287 except IOError:
1288 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001289
1290 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001291 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001292 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001293 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001294 path = self._path(digest)
1295 # A stale broken file may remain. It is possible for the file to have write
1296 # access bit removed which would cause the file_write() call to fail to open
1297 # in write mode. Take no chance here.
1298 file_path.try_remove(path)
1299 try:
1300 size = file_write(path, content)
1301 except:
1302 # There are two possible places were an exception can occur:
1303 # 1) Inside |content| generator in case of network or unzipping errors.
1304 # 2) Inside file_write itself in case of disk IO errors.
1305 # In any case delete an incomplete file and propagate the exception to
1306 # caller, it will be logged there.
1307 file_path.try_remove(path)
1308 raise
1309 # Make the file read-only in the cache. This has a few side-effects since
1310 # the file node is modified, so every directory entries to this file becomes
1311 # read-only. It's fine here because it is a new file.
1312 file_path.set_read_only(path, True)
1313 with self._lock:
1314 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001315 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001316
nodirf33b8d62016-10-26 22:34:58 -07001317 def get_oldest(self):
1318 """Returns digest of the LRU item or None."""
1319 try:
1320 return self._lru.get_oldest()[0]
1321 except KeyError:
1322 return None
1323
1324 def get_timestamp(self, digest):
1325 """Returns timestamp of last use of an item.
1326
1327 Raises KeyError if item is not found.
1328 """
1329 return self._lru.get_timestamp(digest)
1330
1331 def trim(self):
1332 """Forces retention policies."""
1333 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001334 return self._trim()
nodirf33b8d62016-10-26 22:34:58 -07001335
maruele6fc9382017-05-04 09:03:48 -07001336 def _load(self, trim, time_fn):
maruel2e8d0f52016-07-16 07:51:29 -07001337 """Loads state of the cache from json file.
1338
1339 If cache_dir does not exist on disk, it is created.
1340 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001341 self._lock.assert_locked()
1342
maruel2e8d0f52016-07-16 07:51:29 -07001343 if not fs.isfile(self.state_file):
1344 if not os.path.isdir(self.cache_dir):
1345 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001346 else:
maruel2e8d0f52016-07-16 07:51:29 -07001347 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001348 try:
1349 self._lru = lru.LRUDict.load(self.state_file)
1350 except ValueError as err:
1351 logging.error('Failed to load cache state: %s' % (err,))
1352 # Don't want to keep broken state file.
1353 file_path.try_remove(self.state_file)
maruele6fc9382017-05-04 09:03:48 -07001354 if time_fn:
1355 self._lru.time_fn = time_fn
nodirf33b8d62016-10-26 22:34:58 -07001356 if trim:
1357 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001358 # We want the initial cache size after trimming, i.e. what is readily
1359 # avaiable.
1360 self._initial_number_items = len(self._lru)
1361 self._initial_size = sum(self._lru.itervalues())
1362 if self._evicted:
1363 logging.info(
1364 'Trimming evicted items with the following sizes: %s',
1365 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001366
1367 def _save(self):
1368 """Saves the LRU ordering."""
1369 self._lock.assert_locked()
1370 if sys.platform != 'win32':
1371 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001372 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001373 # Necessary otherwise the file can't be created.
1374 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001375 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001376 file_path.set_read_only(self.state_file, False)
1377 self._lru.save(self.state_file)
1378
1379 def _trim(self):
1380 """Trims anything we don't know, make sure enough free space exists."""
1381 self._lock.assert_locked()
1382
1383 # Ensure maximum cache size.
1384 if self.policies.max_cache_size:
1385 total_size = sum(self._lru.itervalues())
1386 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001387 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001388
1389 # Ensure maximum number of items in the cache.
1390 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1391 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001392 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001393
1394 # Ensure enough free space.
1395 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001396 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001397 while (
1398 self.policies.min_free_space and
1399 self._lru and
1400 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001401 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001402 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001403
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001404 if trimmed_due_to_space:
1405 total_usage = sum(self._lru.itervalues())
1406 usage_percent = 0.
1407 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001408 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1409
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001410 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001411 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1412 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1413 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001414 self._free_disk / 1024.,
1415 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001416 usage_percent,
1417 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001418 self._save()
maruele6fc9382017-05-04 09:03:48 -07001419 return trimmed_due_to_space
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001420
1421 def _path(self, digest):
1422 """Returns the path to one item."""
1423 return os.path.join(self.cache_dir, digest)
1424
maruel2e8d0f52016-07-16 07:51:29 -07001425 def _remove_lru_file(self, allow_protected):
1426 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001427 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001428 try:
nodireabc11c2016-10-18 16:37:28 -07001429 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001430 if not allow_protected and digest == self._protected:
Marc-Antoine Ruel211a9552018-01-17 16:47:29 -05001431 total_size = sum(self._lru.itervalues())+size
1432 msg = (
1433 'Not enough space to fetch the whole isolated tree.\n'
1434 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
1435 self.policies, total_size, len(self._lru)+1, self._free_disk)
1436 raise Error(msg)
maruel083fa552016-04-08 14:38:01 -07001437 except KeyError:
1438 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001439 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001440 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001441 self._delete_file(digest, size)
1442 return size
1443
1444 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1445 """Adds an item into LRU cache marking it as a newest one."""
1446 self._lock.assert_locked()
1447 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001448 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001449 self._added.append(size)
1450 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001451 self._free_disk -= size
1452 # Do a quicker version of self._trim(). It only enforces free disk space,
1453 # not cache size limits. It doesn't actually look at real free disk space,
1454 # only uses its cache values. self._trim() will be called later to enforce
1455 # real trimming but doing this quick version here makes it possible to map
1456 # an isolated that is larger than the current amount of free disk space when
1457 # the cache size is already large.
1458 while (
1459 self.policies.min_free_space and
1460 self._lru and
1461 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001462 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001463
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001464 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1465 """Deletes cache file from the file system."""
1466 self._lock.assert_locked()
1467 try:
1468 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001469 try:
1470 size = fs.stat(self._path(digest)).st_size
1471 except OSError:
1472 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001473 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001474 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001475 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001476 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001477 if e.errno != errno.ENOENT:
1478 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001479
1480
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001481class IsolatedBundle(object):
1482 """Fetched and parsed .isolated file with all dependencies."""
1483
Vadim Shtayura3148e072014-09-02 18:51:52 -07001484 def __init__(self):
1485 self.command = []
1486 self.files = {}
1487 self.read_only = None
1488 self.relative_cwd = None
1489 # The main .isolated file, a IsolatedFile instance.
1490 self.root = None
1491
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001492 def fetch(self, fetch_queue, root_isolated_hash, algo):
1493 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001494
1495 It enables support for "included" .isolated files. They are processed in
1496 strict order but fetched asynchronously from the cache. This is important so
1497 that a file in an included .isolated file that is overridden by an embedding
1498 .isolated file is not fetched needlessly. The includes are fetched in one
1499 pass and the files are fetched as soon as all the ones on the left-side
1500 of the tree were fetched.
1501
1502 The prioritization is very important here for nested .isolated files.
1503 'includes' have the highest priority and the algorithm is optimized for both
1504 deep and wide trees. A deep one is a long link of .isolated files referenced
1505 one at a time by one item in 'includes'. A wide one has a large number of
1506 'includes' in a single .isolated file. 'left' is defined as an included
1507 .isolated file earlier in the 'includes' list. So the order of the elements
1508 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001509
1510 As a side effect this method starts asynchronous fetch of all data files
1511 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1512 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001513 """
1514 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1515
1516 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1517 pending = {}
1518 # Set of hashes of already retrieved items to refuse recursive includes.
1519 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001520 # Set of IsolatedFile's whose data files have already being fetched.
1521 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001522
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001523 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001524 h = isolated_file.obj_hash
1525 if h in seen:
1526 raise isolated_format.IsolatedError(
1527 'IsolatedFile %s is retrieved recursively' % h)
1528 assert h not in pending
1529 seen.add(h)
1530 pending[h] = isolated_file
1531 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1532
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001533 # Start fetching root *.isolated file (single file, not the whole bundle).
1534 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001535
1536 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001537 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001538 item_hash = fetch_queue.wait(pending)
1539 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001540 with fetch_queue.cache.getfileobj(item_hash) as f:
1541 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001542
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001543 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001544 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001545 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001546
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001547 # Always fetch *.isolated files in traversal order, waiting if necessary
1548 # until next to-be-processed node loads. "Waiting" is done by yielding
1549 # back to the outer loop, that waits until some *.isolated is loaded.
1550 for node in isolated_format.walk_includes(self.root):
1551 if node not in processed:
1552 # Not visited, and not yet loaded -> wait for it to load.
1553 if not node.is_loaded:
1554 break
1555 # Not visited and loaded -> process it and continue the traversal.
1556 self._start_fetching_files(node, fetch_queue)
1557 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001558
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001559 # All *.isolated files should be processed by now and only them.
1560 all_isolateds = set(isolated_format.walk_includes(self.root))
1561 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001562
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001563 # Extract 'command' and other bundle properties.
1564 for node in isolated_format.walk_includes(self.root):
1565 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001566 self.relative_cwd = self.relative_cwd or ''
1567
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001568 def _start_fetching_files(self, isolated, fetch_queue):
1569 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001570
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001571 Modifies self.files.
1572 """
maruel10bea7b2016-12-07 05:03:49 -08001573 files = isolated.data.get('files', {})
1574 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1575 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001576 # Root isolated has priority on the files being mapped. In particular,
1577 # overridden files must not be fetched.
1578 if filepath not in self.files:
1579 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001580
1581 # Make sure if the isolated is read only, the mode doesn't have write
1582 # bits.
1583 if 'm' in properties and self.read_only:
1584 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1585
1586 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001587 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001588 fetch_queue.add(
1589 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1590
1591 def _update_self(self, node):
1592 """Extracts bundle global parameters from loaded *.isolated file.
1593
1594 Will be called with each loaded *.isolated file in order of traversal of
1595 isolated include graph (see isolated_format.walk_includes).
1596 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001597 # Grabs properties.
1598 if not self.command and node.data.get('command'):
1599 # Ensure paths are correctly separated on windows.
1600 self.command = node.data['command']
1601 if self.command:
1602 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001603 if self.read_only is None and node.data.get('read_only') is not None:
1604 self.read_only = node.data['read_only']
1605 if (self.relative_cwd is None and
1606 node.data.get('relative_cwd') is not None):
1607 self.relative_cwd = node.data['relative_cwd']
1608
1609
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001610def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001611 """Returns Storage class that can upload and download from |namespace|.
1612
1613 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001614 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001615 namespace: isolate namespace to operate in, also defines hashing and
1616 compression scheme used, i.e. namespace names that end with '-gzip'
1617 store compressed data.
1618
1619 Returns:
1620 Instance of Storage.
1621 """
aludwin81178302016-11-30 17:18:49 -08001622 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001623
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001624
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001625def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001626 """Uploads the given tree to the given url.
1627
1628 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001629 base_url: The url of the isolate server to upload to.
1630 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001631 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001632 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001633 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001634 # Filter out symlinks, since they are not represented by items on isolate
1635 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001636 items = []
1637 seen = set()
1638 skipped = 0
1639 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001640 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001641 if 'l' not in metadata and filepath not in seen:
1642 seen.add(filepath)
1643 item = FileItem(
1644 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001645 digest=metadata['h'],
1646 size=metadata['s'],
1647 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001648 items.append(item)
1649 else:
1650 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001651
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001652 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001653 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001654 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001655
1656
maruel4409e302016-07-19 14:25:51 -07001657def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001658 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001659
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001660 Arguments:
1661 isolated_hash: hash of the root *.isolated file.
1662 storage: Storage class that communicates with isolate storage.
1663 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001664 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001665 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001666
1667 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001668 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001669 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001670 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001671 'fetch_isolated(%s, %s, %s, %s, %s)',
1672 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001673 # Hash algorithm to use, defined by namespace |storage| is using.
1674 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001675 with cache:
1676 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001677 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001678
1679 with tools.Profiler('GetIsolateds'):
1680 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001681 if not isolated_format.is_valid_hash(isolated_hash, algo):
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001682 logging.debug('%s is not a valid hash, assuming a file '
1683 '(algo was %s, hash size was %d)',
1684 isolated_hash, algo(), algo().digest_size)
maruel1ceb3872015-10-14 06:10:44 -07001685 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001686 try:
maruel1ceb3872015-10-14 06:10:44 -07001687 isolated_hash = fetch_queue.inject_local_file(path, algo)
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001688 except IOError as e:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001689 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001690 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001691 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001692
1693 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001694 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001695
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001696 with tools.Profiler('GetRest'):
1697 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001698 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001699 create_directories(outdir, bundle.files)
1700 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001701
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001702 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001703 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001704 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001705
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001706 # Multimap: digest -> list of pairs (path, props).
1707 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001708 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001709 if 'h' in props:
1710 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001711
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001712 # Now block on the remaining files to be downloaded and mapped.
1713 logging.info('Retrieving remaining files (%d of them)...',
1714 fetch_queue.pending_count)
1715 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001716 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001717 while remaining:
1718 detector.ping()
1719
1720 # Wait for any item to finish fetching to cache.
1721 digest = fetch_queue.wait(remaining)
1722
tansell9e04a8d2016-07-28 09:31:59 -07001723 # Create the files in the destination using item in cache as the
1724 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001725 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001726 fullpath = os.path.join(outdir, filepath)
1727
1728 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001729 filetype = props.get('t', 'basic')
1730
1731 if filetype == 'basic':
1732 file_mode = props.get('m')
1733 if file_mode:
1734 # Ignore all bits apart from the user
1735 file_mode &= 0700
1736 putfile(
1737 srcfileobj, fullpath, file_mode,
1738 use_symlink=use_symlinks)
1739
tansell26de79e2016-11-13 18:41:11 -08001740 elif filetype == 'tar':
1741 basedir = os.path.dirname(fullpath)
Marc-Antoine Ruelffd80132017-12-04 16:00:02 -05001742 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1743 for ti in t:
tansell26de79e2016-11-13 18:41:11 -08001744 if not ti.isfile():
1745 logging.warning(
1746 'Path(%r) is nonfile (%s), skipped',
1747 ti.name, ti.type)
1748 continue
1749 fp = os.path.normpath(os.path.join(basedir, ti.name))
1750 if not fp.startswith(basedir):
1751 logging.error(
1752 'Path(%r) is outside root directory',
1753 fp)
Marc-Antoine Ruelffd80132017-12-04 16:00:02 -05001754 ifd = t.extractfile(ti)
tansell26de79e2016-11-13 18:41:11 -08001755 file_path.ensure_tree(os.path.dirname(fp))
1756 putfile(ifd, fp, 0700, ti.size)
1757
tanselle4288c32016-07-28 09:45:40 -07001758 else:
1759 raise isolated_format.IsolatedError(
1760 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001761
1762 # Report progress.
1763 duration = time.time() - last_update
1764 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1765 msg = '%d files remaining...' % len(remaining)
1766 print msg
1767 logging.info(msg)
1768 last_update = time.time()
1769
1770 # Cache could evict some items we just tried to fetch, it's a fatal error.
1771 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001772 free_disk = file_path.get_free_space(cache.cache_dir)
1773 msg = (
1774 'Cache is too small to hold all requested files.\n'
1775 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
1776 cache.policies, cache.total_size, cache.number_items, free_disk)
1777 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001778 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001779
1780
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001781def directory_to_metadata(root, algo, blacklist):
1782 """Returns the FileItem list and .isolated metadata for a directory."""
1783 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001784 paths = isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruel7a68f712017-12-01 18:45:18 -05001785 root, u'.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001786 metadata = {
1787 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001788 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001789 for relpath in paths
1790 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001791 for v in metadata.itervalues():
1792 v.pop('t')
1793 items = [
1794 FileItem(
1795 path=os.path.join(root, relpath),
1796 digest=meta['h'],
1797 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001798 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001799 for relpath, meta in metadata.iteritems() if 'h' in meta
1800 ]
1801 return items, metadata
1802
1803
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001804def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001805 """Stores every entries and returns the relevant data.
1806
1807 Arguments:
1808 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001809 files: list of file paths to upload. If a directory is specified, a
1810 .isolated file is created and its hash is returned.
1811 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001812
1813 Returns:
1814 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1815 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001816 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001817 assert all(isinstance(i, unicode) for i in files), files
1818 if len(files) != len(set(map(os.path.abspath, files))):
1819 raise Error('Duplicate entries found.')
1820
maruel064c0a32016-04-05 11:47:15 -07001821 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001822 results = []
1823 # The temporary directory is only created as needed.
1824 tempdir = None
1825 try:
1826 # TODO(maruel): Yield the files to a worker thread.
1827 items_to_upload = []
1828 for f in files:
1829 try:
1830 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001831 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001832 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001833 items, metadata = directory_to_metadata(
1834 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001835
1836 # Create the .isolated file.
1837 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001838 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1839 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001840 os.close(handle)
1841 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001842 'algo':
1843 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001844 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001845 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001846 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001847 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001848 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001849 items_to_upload.extend(items)
1850 items_to_upload.append(
1851 FileItem(
1852 path=isolated,
1853 digest=h,
maruel12e30012015-10-09 11:55:35 -07001854 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001855 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001856 results.append((h, f))
1857
maruel12e30012015-10-09 11:55:35 -07001858 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001859 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001860 items_to_upload.append(
1861 FileItem(
1862 path=filepath,
1863 digest=h,
maruel12e30012015-10-09 11:55:35 -07001864 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001865 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001866 results.append((h, f))
1867 else:
1868 raise Error('%s is neither a file or directory.' % f)
1869 except OSError:
1870 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001871 uploaded = storage.upload_items(items_to_upload)
1872 cold = [i for i in items_to_upload if i in uploaded]
1873 hot = [i for i in items_to_upload if i not in uploaded]
1874 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001875 finally:
maruel12e30012015-10-09 11:55:35 -07001876 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001877 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001878
1879
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001880def archive(out, namespace, files, blacklist):
1881 if files == ['-']:
1882 files = sys.stdin.readlines()
1883
1884 if not files:
1885 raise Error('Nothing to upload')
1886
1887 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001888 blacklist = tools.gen_blacklist(blacklist)
1889 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001890 # Ignore stats.
1891 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001892 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1893
1894
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001895@subcommand.usage('<file1..fileN> or - to read from stdin')
1896def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001897 """Archives data to the server.
1898
1899 If a directory is specified, a .isolated file is created the whole directory
1900 is uploaded. Then this .isolated file can be included in another one to run
1901 commands.
1902
1903 The commands output each file that was processed with its content hash. For
1904 directories, the .isolated generated for the directory is listed as the
1905 directory entry itself.
1906 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001907 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001908 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001909 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001910 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001911 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001912 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001913 except Error as e:
1914 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001915 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001916
1917
1918def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001919 """Download data from the server.
1920
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001921 It can either download individual files or a complete tree from a .isolated
1922 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001923 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001924 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001925 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001926 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001927 help='hash of an isolated file, .isolated file content is discarded, use '
1928 '--file if you need it')
1929 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001930 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1931 help='hash and destination of a file, can be used multiple times')
1932 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001933 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001934 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001935 parser.add_option(
1936 '--use-symlinks', action='store_true',
1937 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001938 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001939 options, args = parser.parse_args(args)
1940 if args:
1941 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001942 if not file_path.enable_symlink():
1943 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001944
nodir55be77b2016-05-03 09:39:57 -07001945 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001946 if bool(options.isolated) == bool(options.file):
1947 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001948 if not options.cache and options.use_symlinks:
1949 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001950
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001951 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001952 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001953 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001954 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001955 if (fs.isfile(options.target) or
1956 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001957 parser.error(
1958 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001959 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001960 # Fetching individual files.
1961 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001962 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001963 channel = threading_utils.TaskChannel()
1964 pending = {}
1965 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001966 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001967 pending[digest] = dest
1968 storage.async_fetch(
1969 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001970 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001971 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001972 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001973 functools.partial(file_write, os.path.join(options.target, dest)))
1974 while pending:
1975 fetched = channel.pull()
1976 dest = pending.pop(fetched)
1977 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001978
Vadim Shtayura3172be52013-12-03 12:49:05 -08001979 # Fetching whole isolated tree.
1980 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001981 with cache:
1982 bundle = fetch_isolated(
1983 isolated_hash=options.isolated,
1984 storage=storage,
1985 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001986 outdir=options.target,
1987 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001988 if bundle.command:
1989 rel = os.path.join(options.target, bundle.relative_cwd)
1990 print('To run this test please run from the directory %s:' %
1991 os.path.join(options.target, rel))
1992 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001993
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001994 return 0
1995
1996
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001997def add_archive_options(parser):
1998 parser.add_option(
1999 '--blacklist',
2000 action='append', default=list(DEFAULT_BLACKLIST),
2001 help='List of regexp to use as blacklist filter when uploading '
2002 'directories')
2003
2004
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002005def add_isolate_server_options(parser):
2006 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002007 parser.add_option(
2008 '-I', '--isolate-server',
2009 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002010 help='URL of the Isolate Server to use. Defaults to the environment '
2011 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2012 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002013 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07002014 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08002015 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002016 '--namespace', default='default-gzip',
2017 help='The namespace to use on the Isolate Server, default: %default')
2018
2019
nodir55be77b2016-05-03 09:39:57 -07002020def process_isolate_server_options(
2021 parser, options, set_exception_handler, required):
2022 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002023
2024 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002025 """
2026 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002027 if required:
2028 parser.error('--isolate-server is required.')
2029 return
2030
aludwind7b7b7e2017-06-29 16:38:50 -07002031 if options.grpc_proxy:
2032 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08002033 else:
2034 try:
2035 options.isolate_server = net.fix_url(options.isolate_server)
2036 except ValueError as e:
2037 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002038 if set_exception_handler:
2039 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002040 try:
2041 return auth.ensure_logged_in(options.isolate_server)
2042 except ValueError as e:
2043 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002044
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002045
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002046def add_cache_options(parser):
2047 cache_group = optparse.OptionGroup(parser, 'Cache management')
2048 cache_group.add_option(
2049 '--cache', metavar='DIR',
2050 help='Directory to keep a local cache of the files. Accelerates download '
2051 'by reusing already downloaded files. Default=%default')
2052 cache_group.add_option(
2053 '--max-cache-size',
2054 type='int',
2055 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002056 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002057 help='Trim if the cache gets larger than this value, default=%default')
2058 cache_group.add_option(
2059 '--min-free-space',
2060 type='int',
2061 metavar='NNN',
2062 default=2*1024*1024*1024,
2063 help='Trim if disk free space becomes lower than this value, '
2064 'default=%default')
2065 cache_group.add_option(
2066 '--max-items',
2067 type='int',
2068 metavar='NNN',
2069 default=100000,
2070 help='Trim if more than this number of items are in the cache '
2071 'default=%default')
2072 parser.add_option_group(cache_group)
2073
2074
maruele6fc9382017-05-04 09:03:48 -07002075def process_cache_options(options, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002076 if options.cache:
2077 policies = CachePolicies(
2078 options.max_cache_size, options.min_free_space, options.max_items)
2079
2080 # |options.cache| path may not exist until DiskCache() instance is created.
2081 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002082 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002083 policies,
nodirf33b8d62016-10-26 22:34:58 -07002084 isolated_format.get_hash_algo(options.namespace),
maruele6fc9382017-05-04 09:03:48 -07002085 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002086 else:
2087 return MemoryCache()
2088
2089
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002090class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002091 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002092 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002093 self,
2094 version=__version__,
2095 prog=os.path.basename(sys.modules[__name__].__file__),
2096 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002097 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002098
2099 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002100 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002101 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002102 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002103 return options, args
2104
2105
2106def main(args):
2107 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002108 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002109
2110
2111if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002112 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002113 fix_encoding.fix_encoding()
2114 tools.disable_buffering()
2115 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002116 sys.exit(main(sys.argv[1:]))