blob: d3dafbbcd1f79b2cd585e51fc93656e65d07e52c [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04008__version__ = '0.8.1'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
maruel12e30012015-10-09 11:55:35 -070030from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040031from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040032from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000033from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040034from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070035from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
aludwin81178302016-11-30 17:18:49 -080041import isolate_storage
42from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080043
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000044
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045# Version of isolate protocol passed to the server in /handshake request.
46ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048
Vadim Shtayura3148e072014-09-02 18:51:52 -070049# The file size to be used when we don't know the correct file size,
50# generally used for .isolated files.
51UNKNOWN_FILE_SIZE = None
52
53
54# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050088DEFAULT_BLACKLIST = (
89 # Temporary vim or python files.
90 r'^.+\.(?:pyc|swp)$',
91 # .git or .svn directory.
92 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
93)
94
95
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050096class Error(Exception):
97 """Generic runtime error."""
98 pass
99
100
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400101class Aborted(Error):
102 """Operation aborted."""
103 pass
104
105
nodir90bc8dc2016-06-15 13:35:21 -0700106class AlreadyExists(Error):
107 """File already exists."""
108
109
maruel12e30012015-10-09 11:55:35 -0700110def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700112 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800113 if offset:
114 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000116 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 if not data:
118 break
119 yield data
120
121
maruel12e30012015-10-09 11:55:35 -0700122def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000123 """Writes file content as generated by content_generator.
124
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 Creates the intermediary directory as needed.
126
127 Returns the number of bytes written.
128
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000129 Meant to be mocked out in unit tests.
130 """
nodire5028a92016-04-29 14:38:21 -0700131 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 total = 0
maruel12e30012015-10-09 11:55:35 -0700133 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000134 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000135 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000137 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138
139
tansell9e04a8d2016-07-28 09:31:59 -0700140def fileobj_path(fileobj):
141 """Return file system path for file like object or None.
142
143 The returned path is guaranteed to exist and can be passed to file system
144 operations like copy.
145 """
146 name = getattr(fileobj, 'name', None)
147 if name is None:
148 return
149
150 # If the file like object was created using something like open("test.txt")
151 # name will end up being a str (such as a function outside our control, like
152 # the standard library). We want all our paths to be unicode objects, so we
153 # decode it.
154 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500155 # We incorrectly assume that UTF-8 is used everywhere.
156 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700157
tansell26de79e2016-11-13 18:41:11 -0800158 # fs.exists requires an absolute path, otherwise it will fail with an
159 # assertion error.
160 if not os.path.isabs(name):
161 return
162
tansell9e04a8d2016-07-28 09:31:59 -0700163 if fs.exists(name):
164 return name
165
166
167# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
168# wrappers have been created.
169def fileobj_copy(
170 dstfileobj, srcfileobj, size=-1,
171 chunk_size=isolated_format.DISK_FILE_CHUNK):
172 """Copy data from srcfileobj to dstfileobj.
173
174 Providing size means exactly that amount of data will be copied (if there
175 isn't enough data, an IOError exception is thrown). Otherwise all data until
176 the EOF marker will be copied.
177 """
178 if size == -1 and hasattr(srcfileobj, 'tell'):
179 if srcfileobj.tell() != 0:
180 raise IOError('partial file but not using size')
181
182 written = 0
183 while written != size:
184 readsize = chunk_size
185 if size > 0:
186 readsize = min(readsize, size-written)
187 data = srcfileobj.read(readsize)
188 if not data:
189 if size == -1:
190 break
191 raise IOError('partial file, got %s, wanted %s' % (written, size))
192 dstfileobj.write(data)
193 written += len(data)
194
195
196def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
197 """Put srcfileobj at the given dstpath with given mode.
198
199 The function aims to do this as efficiently as possible while still allowing
200 any possible file like object be given.
201
202 Creating a tree of hardlinks has a few drawbacks:
203 - tmpfs cannot be used for the scratch space. The tree has to be on the same
204 partition as the cache.
205 - involves a write to the inode, which advances ctime, cause a metadata
206 writeback (causing disk seeking).
207 - cache ctime cannot be used to detect modifications / corruption.
208 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
209 partition. This is why the function automatically fallbacks to copying the
210 file content.
211 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
212 same owner is for all hardlinks.
213 - Anecdotal report that ext2 is known to be potentially faulty on high rate
214 of hardlink creation.
215
216 Creating a tree of symlinks has a few drawbacks:
217 - Tasks running the equivalent of os.path.realpath() will get the naked path
218 and may fail.
219 - Windows:
220 - Symlinks are reparse points:
221 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
222 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
223 - Symbolic links are Win32 paths, not NT paths.
224 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
225 - Symbolic links are supported on Windows 7 and later only.
226 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
227 default.
228 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
229 RID is present in the token;
230 https://msdn.microsoft.com/en-us/library/bb530410.aspx
231 """
232 srcpath = fileobj_path(srcfileobj)
233 if srcpath and size == -1:
234 readonly = file_mode is None or (
235 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
236
237 if readonly:
238 # If the file is read only we can link the file
239 if use_symlink:
240 link_mode = file_path.SYMLINK_WITH_FALLBACK
241 else:
242 link_mode = file_path.HARDLINK_WITH_FALLBACK
243 else:
244 # If not read only, we must copy the file
245 link_mode = file_path.COPY
246
247 file_path.link_file(dstpath, srcpath, link_mode)
248 else:
249 # Need to write out the file
250 with fs.open(dstpath, 'wb') as dstfileobj:
251 fileobj_copy(dstfileobj, srcfileobj, size)
252
253 assert fs.exists(dstpath)
254
255 # file_mode of 0 is actually valid, so need explicit check.
256 if file_mode is not None:
257 fs.chmod(dstpath, file_mode)
258
259
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000260def zip_compress(content_generator, level=7):
261 """Reads chunks from |content_generator| and yields zip compressed chunks."""
262 compressor = zlib.compressobj(level)
263 for chunk in content_generator:
264 compressed = compressor.compress(chunk)
265 if compressed:
266 yield compressed
267 tail = compressor.flush(zlib.Z_FINISH)
268 if tail:
269 yield tail
270
271
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400272def zip_decompress(
273 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000274 """Reads zipped data from |content_generator| and yields decompressed data.
275
276 Decompresses data in small chunks (no larger than |chunk_size|) so that
277 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
278
279 Raises IOError if data is corrupted or incomplete.
280 """
281 decompressor = zlib.decompressobj()
282 compressed_size = 0
283 try:
284 for chunk in content_generator:
285 compressed_size += len(chunk)
286 data = decompressor.decompress(chunk, chunk_size)
287 if data:
288 yield data
289 while decompressor.unconsumed_tail:
290 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
291 if data:
292 yield data
293 tail = decompressor.flush()
294 if tail:
295 yield tail
296 except zlib.error as e:
297 raise IOError(
298 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
299 # Ensure all data was read and decompressed.
300 if decompressor.unused_data or decompressor.unconsumed_tail:
301 raise IOError('Not all data was decompressed')
302
303
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000304def get_zip_compression_level(filename):
305 """Given a filename calculates the ideal zip compression level to use."""
306 file_ext = os.path.splitext(filename)[1].lower()
307 # TODO(csharp): Profile to find what compression level works best.
308 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
309
310
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000311def create_directories(base_directory, files):
312 """Creates the directory structure needed by the given list of files."""
313 logging.debug('create_directories(%s, %d)', base_directory, len(files))
314 # Creates the tree of directories to create.
315 directories = set(os.path.dirname(f) for f in files)
316 for item in list(directories):
317 while item:
318 directories.add(item)
319 item = os.path.dirname(item)
320 for d in sorted(directories):
321 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700322 abs_d = os.path.join(base_directory, d)
323 if not fs.isdir(abs_d):
324 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000325
326
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500327def create_symlinks(base_directory, files):
328 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000329 for filepath, properties in files:
330 if 'l' not in properties:
331 continue
332 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500333 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000334 logging.warning('Ignoring symlink %s', filepath)
335 continue
336 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700337 try:
338 os.symlink(properties['l'], outfile) # pylint: disable=E1101
339 except OSError as e:
340 if e.errno == errno.EEXIST:
341 raise AlreadyExists('File %s already exists.' % outfile)
342 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000343
344
maruel12e30012015-10-09 11:55:35 -0700345def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000346 """Determines if the given files appears valid.
347
vadimsh129e5942017-01-04 16:42:46 -0800348 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000349 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700350 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700351 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800352 try:
353 actual_size = fs.stat(path).st_size
354 except OSError as e:
355 logging.warning(
356 'Can\'t read item %s, assuming it\'s invalid: %s',
357 os.path.basename(path), e)
358 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000359 if size != actual_size:
360 logging.warning(
361 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700362 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000363 return False
364 return True
365
366
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000367class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800368 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000369
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 Its digest and size may be provided in advance, if known. Otherwise they will
371 be derived from the file content.
372 """
373
374 def __init__(self, path, digest=None, size=None, high_priority=False):
375 super(FileItem, self).__init__(
376 digest,
maruel12e30012015-10-09 11:55:35 -0700377 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800378 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000379 self.path = path
380 self.compression_level = get_zip_compression_level(path)
381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000386class BufferItem(Item):
387 """A byte buffer to push to Storage."""
388
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800389 def __init__(self, buf, high_priority=False):
390 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000391 self.buffer = buf
392
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800393 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000394 return [self.buffer]
395
396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800398 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000399
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800400 Implements compression support, parallel 'contains' checks, parallel uploads
401 and more.
402
403 Works only within single namespace (and thus hashing algorithm and compression
404 scheme are fixed).
405
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400406 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
407 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800408 """
409
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700410 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000411 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400412 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700413 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400414 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 self._cpu_thread_pool = None
416 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 self._aborted = False
418 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000419
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700421 def hash_algo(self):
422 """Hashing algorithm used to name files in storage based on their content.
423
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400424 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700425 """
426 return self._hash_algo
427
428 @property
429 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500430 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700431 return self._storage_api.location
432
433 @property
434 def namespace(self):
435 """Isolate namespace used by this storage.
436
437 Indirectly defines hashing scheme and compression method used.
438 """
439 return self._storage_api.namespace
440
441 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000442 def cpu_thread_pool(self):
443 """ThreadPool for CPU-bound tasks like zipping."""
444 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500445 threads = max(threading_utils.num_processors(), 2)
446 if sys.maxsize <= 2L**32:
447 # On 32 bits userland, do not try to use more than 16 threads.
448 threads = min(threads, 16)
449 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000450 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 @property
453 def net_thread_pool(self):
454 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
455 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700456 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 def close(self):
460 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400461 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 if self._cpu_thread_pool:
463 self._cpu_thread_pool.join()
464 self._cpu_thread_pool.close()
465 self._cpu_thread_pool = None
466 if self._net_thread_pool:
467 self._net_thread_pool.join()
468 self._net_thread_pool.close()
469 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400470 logging.info('Done.')
471
472 def abort(self):
473 """Cancels any pending or future operations."""
474 # This is not strictly theadsafe, but in the worst case the logging message
475 # will be printed twice. Not a big deal. In other places it is assumed that
476 # unprotected reads and writes to _aborted are serializable (it is true
477 # for python) and thus no locking is used.
478 if not self._aborted:
479 logging.warning('Aborting... It can take a while.')
480 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000481
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 def __enter__(self):
483 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400484 assert not self._prev_sig_handlers, self._prev_sig_handlers
485 for s in (signal.SIGINT, signal.SIGTERM):
486 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 return self
488
489 def __exit__(self, _exc_type, _exc_value, _traceback):
490 """Context manager interface."""
491 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400492 while self._prev_sig_handlers:
493 s, h = self._prev_sig_handlers.popitem()
494 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 return False
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
503 items: list of Item instances that represents data to upload.
504
505 Returns:
506 List of items that were uploaded. All other items are already there.
507 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700508 logging.info('upload_items(items=%d)', len(items))
509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 # Ensure all digests are calculated.
511 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700512 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000514 # For each digest keep only first Item that matches it. All other items
515 # are just indistinguishable copies from the point of view of isolate
516 # server (it doesn't care about paths at all, only content and digests).
517 seen = {}
518 duplicates = 0
519 for item in items:
520 if seen.setdefault(item.digest, item) is not item:
521 duplicates += 1
522 items = seen.values()
523 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700524 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000525
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 channel = threading_utils.TaskChannel()
530 for missing_item, push_state in self.get_missing_items(items):
531 missing.add(missing_item)
532 self.async_push(channel, missing_item, push_state)
533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 # No need to spawn deadlock detector thread if there's nothing to upload.
535 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700536 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 detector.ping()
540 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 logging.info('All files are uploaded')
545
546 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 total = len(items)
548 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 logging.info(
550 'Total: %6d, %9.1fkb',
551 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000552 total_size / 1024.)
553 cache_hit = set(items) - missing
554 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 logging.info(
556 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
557 len(cache_hit),
558 cache_hit_size / 1024.,
559 len(cache_hit) * 100. / total,
560 cache_hit_size * 100. / total_size if total_size else 0)
561 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 logging.info(
564 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
565 len(cache_miss),
566 cache_miss_size / 1024.,
567 len(cache_miss) * 100. / total,
568 cache_miss_size * 100. / total_size if total_size else 0)
569
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 return uploaded
571
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000573 """Starts asynchronous push to the server in a parallel thread.
574
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 Can be used only after |item| was checked for presence on a server with
576 'get_missing_items' call. 'get_missing_items' returns |push_state| object
577 that contains storage specific information describing how to upload
578 the item (for example in case of cloud storage, it is signed upload URLs).
579
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000581 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000582 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800583 push_state: push state returned by 'get_missing_items' call for |item|.
584
585 Returns:
586 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800588 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400589 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700590 threading_utils.PRIORITY_HIGH if item.high_priority
591 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800592
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000593 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400594 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400595 if self._aborted:
596 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800598 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 return item
600
601 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700602 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800603 self.net_thread_pool.add_task_with_channel(
604 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 return
606
607 # If zipping is enabled, zip in a separate thread.
608 def zip_and_push():
609 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
610 # content right here. It will block until all file is zipped.
611 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400612 if self._aborted:
613 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 data = ''.join(stream)
616 except Exception as exc:
617 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800618 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000620 self.net_thread_pool.add_task_with_channel(
621 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000622 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624 def push(self, item, push_state):
625 """Synchronously pushes a single item to the server.
626
627 If you need to push many items at once, consider using 'upload_items' or
628 'async_push' with instance of TaskChannel.
629
630 Arguments:
631 item: item to upload as instance of Item class.
632 push_state: push state returned by 'get_missing_items' call for |item|.
633
634 Returns:
635 Pushed item (same object as |item|).
636 """
637 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700638 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639 self.async_push(channel, item, push_state)
640 pushed = channel.pull()
641 assert pushed is item
642 return item
643
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000644 def async_fetch(self, channel, priority, digest, size, sink):
645 """Starts asynchronous fetch from the server in a parallel thread.
646
647 Arguments:
648 channel: TaskChannel that receives back |digest| when download ends.
649 priority: thread pool task priority for the fetch.
650 digest: hex digest of an item to download.
651 size: expected size of the item (after decompression).
652 sink: function that will be called as sink(generator).
653 """
654 def fetch():
655 try:
656 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400657 stream = self._storage_api.fetch(digest, size, 0)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700658 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400659 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000660 # Run |stream| through verifier that will assert its size.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400661 verifier = FetchStreamVerifier(stream, self._hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000662 # Verified stream goes to |sink|.
663 sink(verifier.run())
664 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800665 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000666 raise
667 return digest
668
669 # Don't bother with zip_thread_pool for decompression. Decompression is
670 # really fast and most probably IO bound anyway.
671 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
672
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000673 def get_missing_items(self, items):
674 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000675
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000676 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000677
678 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000679 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800682 For each missing item it yields a pair (item, push_state), where:
683 * item - Item object that is missing (one of |items|).
684 * push_state - opaque object that contains storage specific information
685 describing how to upload the item (for example in case of cloud
686 storage, it is signed upload URLs). It can later be passed to
687 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000688 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000689 channel = threading_utils.TaskChannel()
690 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800691
692 # Ensure all digests are calculated.
693 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700694 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800695
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400696 def contains(batch):
697 if self._aborted:
698 raise Aborted()
699 return self._storage_api.contains(batch)
700
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000701 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800702 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400703 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400704 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000705 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800706
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000707 # Yield results as they come in.
708 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800709 for missing_item, push_state in channel.pull().iteritems():
710 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000711
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000712
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800713def batch_items_for_check(items):
714 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000715
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800716 Each batch corresponds to a single 'exists?' query to the server via a call
717 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000718
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800719 Arguments:
720 items: a list of Item objects.
721
722 Yields:
723 Batches of items to query for existence in a single operation,
724 each batch is a list of Item objects.
725 """
726 batch_count = 0
727 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
728 next_queries = []
729 for item in sorted(items, key=lambda x: x.size, reverse=True):
730 next_queries.append(item)
731 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000732 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800733 next_queries = []
734 batch_count += 1
735 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
736 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
737 if next_queries:
738 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000739
740
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000741class FetchQueue(object):
742 """Fetches items from Storage and places them into LocalCache.
743
744 It manages multiple concurrent fetch operations. Acts as a bridge between
745 Storage and LocalCache so that Storage and LocalCache don't depend on each
746 other at all.
747 """
748
749 def __init__(self, storage, cache):
750 self.storage = storage
751 self.cache = cache
752 self._channel = threading_utils.TaskChannel()
753 self._pending = set()
754 self._accessed = set()
755 self._fetched = cache.cached_set()
756
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400757 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700758 self,
759 digest,
760 size=UNKNOWN_FILE_SIZE,
761 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000762 """Starts asynchronous fetch of item |digest|."""
763 # Fetching it now?
764 if digest in self._pending:
765 return
766
767 # Mark this file as in use, verify_all_cached will later ensure it is still
768 # in cache.
769 self._accessed.add(digest)
770
771 # Already fetched? Notify cache to update item's LRU position.
772 if digest in self._fetched:
773 # 'touch' returns True if item is in cache and not corrupted.
774 if self.cache.touch(digest, size):
775 return
776 # Item is corrupted, remove it from cache and fetch it again.
777 self._fetched.remove(digest)
778 self.cache.evict(digest)
779
780 # TODO(maruel): It should look at the free disk space, the current cache
781 # size and the size of the new item on every new item:
782 # - Trim the cache as more entries are listed when free disk space is low,
783 # otherwise if the amount of data downloaded during the run > free disk
784 # space, it'll crash.
785 # - Make sure there's enough free disk space to fit all dependencies of
786 # this run! If not, abort early.
787
788 # Start fetching.
789 self._pending.add(digest)
790 self.storage.async_fetch(
791 self._channel, priority, digest, size,
792 functools.partial(self.cache.write, digest))
793
794 def wait(self, digests):
795 """Starts a loop that waits for at least one of |digests| to be retrieved.
796
797 Returns the first digest retrieved.
798 """
799 # Flush any already fetched items.
800 for digest in digests:
801 if digest in self._fetched:
802 return digest
803
804 # Ensure all requested items are being fetched now.
805 assert all(digest in self._pending for digest in digests), (
806 digests, self._pending)
807
808 # Wait for some requested item to finish fetching.
809 while self._pending:
810 digest = self._channel.pull()
811 self._pending.remove(digest)
812 self._fetched.add(digest)
813 if digest in digests:
814 return digest
815
816 # Should never reach this point due to assert above.
817 raise RuntimeError('Impossible state')
818
819 def inject_local_file(self, path, algo):
820 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700821 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 data = f.read()
823 digest = algo(data).hexdigest()
824 self.cache.write(digest, [data])
825 self._fetched.add(digest)
826 return digest
827
828 @property
829 def pending_count(self):
830 """Returns number of items to be fetched."""
831 return len(self._pending)
832
833 def verify_all_cached(self):
834 """True if all accessed items are in cache."""
835 return self._accessed.issubset(self.cache.cached_set())
836
837
838class FetchStreamVerifier(object):
839 """Verifies that fetched file is valid before passing it to the LocalCache."""
840
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400841 def __init__(self, stream, hasher, expected_digest, expected_size):
842 """Initializes the verifier.
843
844 Arguments:
845 * stream: an iterable yielding chunks of content
846 * hasher: an object from hashlib that supports update() and hexdigest()
847 (eg, hashlib.sha1).
848 * expected_digest: if the entire stream is piped through hasher and then
849 summarized via hexdigest(), this should be the result. That is, it
850 should be a hex string like 'abc123'.
851 * expected_size: either the expected size of the stream, or
852 UNKNOWN_FILE_SIZE.
853 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400854 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000855 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400856 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000857 self.expected_size = expected_size
858 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400859 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000860
861 def run(self):
862 """Generator that yields same items as |stream|.
863
864 Verifies |stream| is complete before yielding a last chunk to consumer.
865
866 Also wraps IOError produced by consumer into MappingError exceptions since
867 otherwise Storage will retry fetch on unrelated local cache errors.
868 """
869 # Read one chunk ahead, keep it in |stored|.
870 # That way a complete stream can be verified before pushing last chunk
871 # to consumer.
872 stored = None
873 for chunk in self.stream:
874 assert chunk is not None
875 if stored is not None:
876 self._inspect_chunk(stored, is_last=False)
877 try:
878 yield stored
879 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400880 raise isolated_format.MappingError(
881 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000882 stored = chunk
883 if stored is not None:
884 self._inspect_chunk(stored, is_last=True)
885 try:
886 yield stored
887 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400888 raise isolated_format.MappingError(
889 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000890
891 def _inspect_chunk(self, chunk, is_last):
892 """Called for each fetched chunk before passing it to consumer."""
893 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400894 self.rolling_hash.update(chunk)
895 if not is_last:
896 return
897
898 if ((self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000899 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400900 msg = 'Incorrect file size: want %d, got %d' % (
901 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400902 raise IOError(msg)
903
904 actual_digest = self.rolling_hash.hexdigest()
905 if self.expected_digest != actual_digest:
906 msg = 'Incorrect digest: want %s, got %s' % (
907 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400908 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000909
910
nodir445097b2016-06-03 22:50:26 -0700911class CacheMiss(Exception):
912 """Raised when an item is not in cache."""
913
914 def __init__(self, digest):
915 self.digest = digest
916 super(CacheMiss, self).__init__(
917 'Item with digest %r is not found in cache' % digest)
918
919
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000920class LocalCache(object):
921 """Local cache that stores objects fetched via Storage.
922
923 It can be accessed concurrently from multiple threads, so it should protect
924 its internal state with some lock.
925 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500926 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000927
maruel064c0a32016-04-05 11:47:15 -0700928 def __init__(self):
929 self._lock = threading_utils.LockWithAssert()
930 # Profiling values.
931 self._added = []
932 self._initial_number_items = 0
933 self._initial_size = 0
934 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700935 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700936
nodirbe642ff2016-06-09 15:51:51 -0700937 def __contains__(self, digest):
938 raise NotImplementedError()
939
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000940 def __enter__(self):
941 """Context manager interface."""
942 return self
943
944 def __exit__(self, _exc_type, _exec_value, _traceback):
945 """Context manager interface."""
946 return False
947
maruel064c0a32016-04-05 11:47:15 -0700948 @property
949 def added(self):
950 return self._added[:]
951
952 @property
953 def evicted(self):
954 return self._evicted[:]
955
956 @property
tansell9e04a8d2016-07-28 09:31:59 -0700957 def used(self):
958 return self._used[:]
959
960 @property
maruel064c0a32016-04-05 11:47:15 -0700961 def initial_number_items(self):
962 return self._initial_number_items
963
964 @property
965 def initial_size(self):
966 return self._initial_size
967
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000968 def cached_set(self):
969 """Returns a set of all cached digests (always a new object)."""
970 raise NotImplementedError()
971
maruel36a963d2016-04-08 17:15:49 -0700972 def cleanup(self):
973 """Deletes any corrupted item from the cache and trims it if necessary."""
974 raise NotImplementedError()
975
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000976 def touch(self, digest, size):
977 """Ensures item is not corrupted and updates its LRU position.
978
979 Arguments:
980 digest: hash digest of item to check.
981 size: expected size of this item.
982
983 Returns:
984 True if item is in cache and not corrupted.
985 """
986 raise NotImplementedError()
987
988 def evict(self, digest):
989 """Removes item from cache if it's there."""
990 raise NotImplementedError()
991
tansell9e04a8d2016-07-28 09:31:59 -0700992 def getfileobj(self, digest):
993 """Returns a readable file like object.
994
995 If file exists on the file system it will have a .name attribute with an
996 absolute path to the file.
997 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000998 raise NotImplementedError()
999
1000 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001001 """Reads data from |content| generator and stores it in cache.
1002
1003 Returns digest to simplify chaining.
1004 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001005 raise NotImplementedError()
1006
maruele6fc9382017-05-04 09:03:48 -07001007 def trim(self):
1008 """Enforces cache policies.
1009
1010 Returns:
1011 Number of items evicted.
1012 """
1013 raise NotImplementedError()
1014
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015
1016class MemoryCache(LocalCache):
1017 """LocalCache implementation that stores everything in memory."""
1018
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001019 def __init__(self, file_mode_mask=0500):
1020 """Args:
1021 file_mode_mask: bit mask to AND file mode with. Default value will make
1022 all mapped files to be read only.
1023 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001024 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001025 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001026 self._contents = {}
1027
nodirbe642ff2016-06-09 15:51:51 -07001028 def __contains__(self, digest):
1029 with self._lock:
1030 return digest in self._contents
1031
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001032 def cached_set(self):
1033 with self._lock:
1034 return set(self._contents)
1035
maruel36a963d2016-04-08 17:15:49 -07001036 def cleanup(self):
1037 pass
1038
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001039 def touch(self, digest, size):
1040 with self._lock:
1041 return digest in self._contents
1042
1043 def evict(self, digest):
1044 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001045 v = self._contents.pop(digest, None)
1046 if v is not None:
1047 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001048
tansell9e04a8d2016-07-28 09:31:59 -07001049 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001050 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001051 try:
tansell9e04a8d2016-07-28 09:31:59 -07001052 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001053 except KeyError:
1054 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001055 self._used.append(len(d))
1056 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001057
1058 def write(self, digest, content):
1059 # Assemble whole stream before taking the lock.
1060 data = ''.join(content)
1061 with self._lock:
1062 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001063 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001064 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001065
maruele6fc9382017-05-04 09:03:48 -07001066 def trim(self):
1067 """Trimming is not implemented for MemoryCache."""
1068 return 0
1069
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001070
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001071class CachePolicies(object):
1072 def __init__(self, max_cache_size, min_free_space, max_items):
1073 """
1074 Arguments:
1075 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1076 cache is effectively a leak.
1077 - min_free_space: Trim if disk free space becomes lower than this value. If
1078 0, it unconditionally fill the disk.
1079 - max_items: Maximum number of items to keep in the cache. If 0, do not
1080 enforce a limit.
1081 """
1082 self.max_cache_size = max_cache_size
1083 self.min_free_space = min_free_space
1084 self.max_items = max_items
1085
1086
1087class DiskCache(LocalCache):
1088 """Stateful LRU cache in a flat hash table in a directory.
1089
1090 Saves its state as json file.
1091 """
maruel12e30012015-10-09 11:55:35 -07001092 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001093
maruele6fc9382017-05-04 09:03:48 -07001094 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001095 """
1096 Arguments:
1097 cache_dir: directory where to place the cache.
1098 policies: cache retention policies.
1099 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001100 trim: if True to enforce |policies| right away.
1101 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001102 """
maruel064c0a32016-04-05 11:47:15 -07001103 # All protected methods (starting with '_') except _path should be called
1104 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001105 super(DiskCache, self).__init__()
1106 self.cache_dir = cache_dir
1107 self.policies = policies
1108 self.hash_algo = hash_algo
1109 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001110 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001111 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001112 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001113 file_path.ensure_tree(self.cache_dir)
1114 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001115 # The first item in the LRU cache that must not be evicted during this run
1116 # since it was referenced. All items more recent that _protected in the LRU
1117 # cache are also inherently protected. It could be a set() of all items
1118 # referenced but this increases memory usage without a use case.
1119 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001120 # Cleanup operations done by self._load(), if any.
1121 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001122 with tools.Profiler('Setup'):
1123 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001124 self._load(trim, time_fn)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001125
nodirbe642ff2016-06-09 15:51:51 -07001126 def __contains__(self, digest):
1127 with self._lock:
1128 return digest in self._lru
1129
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001130 def __enter__(self):
1131 return self
1132
1133 def __exit__(self, _exc_type, _exec_value, _traceback):
1134 with tools.Profiler('CleanupTrimming'):
1135 with self._lock:
1136 self._trim()
1137
1138 logging.info(
1139 '%5d (%8dkb) added',
1140 len(self._added), sum(self._added) / 1024)
1141 logging.info(
1142 '%5d (%8dkb) current',
1143 len(self._lru),
1144 sum(self._lru.itervalues()) / 1024)
1145 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001146 '%5d (%8dkb) evicted',
1147 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001148 logging.info(
1149 ' %8dkb free',
1150 self._free_disk / 1024)
1151 return False
1152
1153 def cached_set(self):
1154 with self._lock:
1155 return self._lru.keys_set()
1156
maruel36a963d2016-04-08 17:15:49 -07001157 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001158 """Cleans up the cache directory.
1159
1160 Ensures there is no unknown files in cache_dir.
1161 Ensures the read-only bits are set correctly.
1162
1163 At that point, the cache was already loaded, trimmed to respect cache
1164 policies.
1165 """
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001166 with self._lock:
1167 fs.chmod(self.cache_dir, 0700)
1168 # Ensure that all files listed in the state still exist and add new ones.
1169 previous = self._lru.keys_set()
1170 # It'd be faster if there were a readdir() function.
1171 for filename in fs.listdir(self.cache_dir):
1172 if filename == self.STATE_FILE:
1173 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1174 continue
1175 if filename in previous:
1176 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1177 previous.remove(filename)
1178 continue
1179
1180 # An untracked file. Delete it.
1181 logging.warning('Removing unknown file %s from cache', filename)
1182 p = self._path(filename)
1183 if fs.isdir(p):
1184 try:
1185 file_path.rmtree(p)
1186 except OSError:
1187 pass
1188 else:
1189 file_path.try_remove(p)
maruel2e8d0f52016-07-16 07:51:29 -07001190 continue
1191
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001192 if previous:
1193 # Filter out entries that were not found.
1194 logging.warning('Removed %d lost files', len(previous))
1195 for filename in previous:
1196 self._lru.pop(filename)
1197 self._save()
maruel36a963d2016-04-08 17:15:49 -07001198
1199 # What remains to be done is to hash every single item to
1200 # detect corruption, then save to ensure state.json is up to date.
1201 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1202 # TODO(maruel): Let's revisit once directory metadata is stored in
1203 # state.json so only the files that had been mapped since the last cleanup()
1204 # call are manually verified.
1205 #
1206 #with self._lock:
1207 # for digest in self._lru:
1208 # if not isolated_format.is_valid_hash(
1209 # self._path(digest), self.hash_algo):
1210 # self.evict(digest)
1211 # logging.info('Deleted corrupted item: %s', digest)
1212
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001213 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001214 """Verifies an actual file is valid and bumps its LRU position.
1215
1216 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1217 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001218
1219 Note that is doesn't compute the hash so it could still be corrupted if the
1220 file size didn't change.
1221
1222 TODO(maruel): More stringent verification while keeping the check fast.
1223 """
1224 # Do the check outside the lock.
1225 if not is_valid_file(self._path(digest), size):
1226 return False
1227
1228 # Update it's LRU position.
1229 with self._lock:
1230 if digest not in self._lru:
1231 return False
1232 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001233 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001234 return True
1235
1236 def evict(self, digest):
1237 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001238 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001239 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001240 self._lru.pop(digest)
1241 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1242
tansell9e04a8d2016-07-28 09:31:59 -07001243 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001244 try:
tansell9e04a8d2016-07-28 09:31:59 -07001245 f = fs.open(self._path(digest), 'rb')
1246 with self._lock:
1247 self._used.append(self._lru[digest])
1248 return f
nodir445097b2016-06-03 22:50:26 -07001249 except IOError:
1250 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001251
1252 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001253 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001254 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001255 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001256 path = self._path(digest)
1257 # A stale broken file may remain. It is possible for the file to have write
1258 # access bit removed which would cause the file_write() call to fail to open
1259 # in write mode. Take no chance here.
1260 file_path.try_remove(path)
1261 try:
1262 size = file_write(path, content)
1263 except:
1264 # There are two possible places were an exception can occur:
1265 # 1) Inside |content| generator in case of network or unzipping errors.
1266 # 2) Inside file_write itself in case of disk IO errors.
1267 # In any case delete an incomplete file and propagate the exception to
1268 # caller, it will be logged there.
1269 file_path.try_remove(path)
1270 raise
1271 # Make the file read-only in the cache. This has a few side-effects since
1272 # the file node is modified, so every directory entries to this file becomes
1273 # read-only. It's fine here because it is a new file.
1274 file_path.set_read_only(path, True)
1275 with self._lock:
1276 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001277 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001278
nodirf33b8d62016-10-26 22:34:58 -07001279 def get_oldest(self):
1280 """Returns digest of the LRU item or None."""
1281 try:
1282 return self._lru.get_oldest()[0]
1283 except KeyError:
1284 return None
1285
1286 def get_timestamp(self, digest):
1287 """Returns timestamp of last use of an item.
1288
1289 Raises KeyError if item is not found.
1290 """
1291 return self._lru.get_timestamp(digest)
1292
1293 def trim(self):
1294 """Forces retention policies."""
1295 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001296 return self._trim()
nodirf33b8d62016-10-26 22:34:58 -07001297
maruele6fc9382017-05-04 09:03:48 -07001298 def _load(self, trim, time_fn):
maruel2e8d0f52016-07-16 07:51:29 -07001299 """Loads state of the cache from json file.
1300
1301 If cache_dir does not exist on disk, it is created.
1302 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001303 self._lock.assert_locked()
1304
maruel2e8d0f52016-07-16 07:51:29 -07001305 if not fs.isfile(self.state_file):
1306 if not os.path.isdir(self.cache_dir):
1307 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001308 else:
maruel2e8d0f52016-07-16 07:51:29 -07001309 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001310 try:
1311 self._lru = lru.LRUDict.load(self.state_file)
1312 except ValueError as err:
1313 logging.error('Failed to load cache state: %s' % (err,))
1314 # Don't want to keep broken state file.
1315 file_path.try_remove(self.state_file)
maruele6fc9382017-05-04 09:03:48 -07001316 if time_fn:
1317 self._lru.time_fn = time_fn
nodirf33b8d62016-10-26 22:34:58 -07001318 if trim:
1319 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001320 # We want the initial cache size after trimming, i.e. what is readily
1321 # avaiable.
1322 self._initial_number_items = len(self._lru)
1323 self._initial_size = sum(self._lru.itervalues())
1324 if self._evicted:
1325 logging.info(
1326 'Trimming evicted items with the following sizes: %s',
1327 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001328
1329 def _save(self):
1330 """Saves the LRU ordering."""
1331 self._lock.assert_locked()
1332 if sys.platform != 'win32':
1333 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001334 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001335 # Necessary otherwise the file can't be created.
1336 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001337 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001338 file_path.set_read_only(self.state_file, False)
1339 self._lru.save(self.state_file)
1340
1341 def _trim(self):
1342 """Trims anything we don't know, make sure enough free space exists."""
1343 self._lock.assert_locked()
1344
1345 # Ensure maximum cache size.
1346 if self.policies.max_cache_size:
1347 total_size = sum(self._lru.itervalues())
1348 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001349 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001350
1351 # Ensure maximum number of items in the cache.
1352 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1353 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001354 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001355
1356 # Ensure enough free space.
1357 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001358 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001359 while (
1360 self.policies.min_free_space and
1361 self._lru and
1362 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001363 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001364 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001365
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001366 if trimmed_due_to_space:
1367 total_usage = sum(self._lru.itervalues())
1368 usage_percent = 0.
1369 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001370 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1371
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001372 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001373 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1374 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1375 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001376 self._free_disk / 1024.,
1377 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001378 usage_percent,
1379 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001380 self._save()
maruele6fc9382017-05-04 09:03:48 -07001381 return trimmed_due_to_space
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001382
1383 def _path(self, digest):
1384 """Returns the path to one item."""
1385 return os.path.join(self.cache_dir, digest)
1386
maruel2e8d0f52016-07-16 07:51:29 -07001387 def _remove_lru_file(self, allow_protected):
1388 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001389 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001390 try:
nodireabc11c2016-10-18 16:37:28 -07001391 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001392 if not allow_protected and digest == self._protected:
maruele6fc9382017-05-04 09:03:48 -07001393 raise Error(
1394 'Not enough space to fetch the whole isolated tree; %sb free, min '
1395 'is %sb' % (self._free_disk, self.policies.min_free_space))
maruel083fa552016-04-08 14:38:01 -07001396 except KeyError:
1397 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001398 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001399 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001400 self._delete_file(digest, size)
1401 return size
1402
1403 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1404 """Adds an item into LRU cache marking it as a newest one."""
1405 self._lock.assert_locked()
1406 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001407 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001408 self._added.append(size)
1409 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001410 self._free_disk -= size
1411 # Do a quicker version of self._trim(). It only enforces free disk space,
1412 # not cache size limits. It doesn't actually look at real free disk space,
1413 # only uses its cache values. self._trim() will be called later to enforce
1414 # real trimming but doing this quick version here makes it possible to map
1415 # an isolated that is larger than the current amount of free disk space when
1416 # the cache size is already large.
1417 while (
1418 self.policies.min_free_space and
1419 self._lru and
1420 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001421 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001422
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001423 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1424 """Deletes cache file from the file system."""
1425 self._lock.assert_locked()
1426 try:
1427 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001428 try:
1429 size = fs.stat(self._path(digest)).st_size
1430 except OSError:
1431 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001432 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001433 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001434 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001435 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001436 if e.errno != errno.ENOENT:
1437 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001438
1439
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001440class IsolatedBundle(object):
1441 """Fetched and parsed .isolated file with all dependencies."""
1442
Vadim Shtayura3148e072014-09-02 18:51:52 -07001443 def __init__(self):
1444 self.command = []
1445 self.files = {}
1446 self.read_only = None
1447 self.relative_cwd = None
1448 # The main .isolated file, a IsolatedFile instance.
1449 self.root = None
1450
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001451 def fetch(self, fetch_queue, root_isolated_hash, algo):
1452 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001453
1454 It enables support for "included" .isolated files. They are processed in
1455 strict order but fetched asynchronously from the cache. This is important so
1456 that a file in an included .isolated file that is overridden by an embedding
1457 .isolated file is not fetched needlessly. The includes are fetched in one
1458 pass and the files are fetched as soon as all the ones on the left-side
1459 of the tree were fetched.
1460
1461 The prioritization is very important here for nested .isolated files.
1462 'includes' have the highest priority and the algorithm is optimized for both
1463 deep and wide trees. A deep one is a long link of .isolated files referenced
1464 one at a time by one item in 'includes'. A wide one has a large number of
1465 'includes' in a single .isolated file. 'left' is defined as an included
1466 .isolated file earlier in the 'includes' list. So the order of the elements
1467 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001468
1469 As a side effect this method starts asynchronous fetch of all data files
1470 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1471 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001472 """
1473 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1474
1475 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1476 pending = {}
1477 # Set of hashes of already retrieved items to refuse recursive includes.
1478 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001479 # Set of IsolatedFile's whose data files have already being fetched.
1480 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001481
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001482 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001483 h = isolated_file.obj_hash
1484 if h in seen:
1485 raise isolated_format.IsolatedError(
1486 'IsolatedFile %s is retrieved recursively' % h)
1487 assert h not in pending
1488 seen.add(h)
1489 pending[h] = isolated_file
1490 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1491
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001492 # Start fetching root *.isolated file (single file, not the whole bundle).
1493 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001494
1495 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001496 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001497 item_hash = fetch_queue.wait(pending)
1498 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001499 with fetch_queue.cache.getfileobj(item_hash) as f:
1500 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001501
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001502 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001503 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001504 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001505
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001506 # Always fetch *.isolated files in traversal order, waiting if necessary
1507 # until next to-be-processed node loads. "Waiting" is done by yielding
1508 # back to the outer loop, that waits until some *.isolated is loaded.
1509 for node in isolated_format.walk_includes(self.root):
1510 if node not in processed:
1511 # Not visited, and not yet loaded -> wait for it to load.
1512 if not node.is_loaded:
1513 break
1514 # Not visited and loaded -> process it and continue the traversal.
1515 self._start_fetching_files(node, fetch_queue)
1516 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001517
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001518 # All *.isolated files should be processed by now and only them.
1519 all_isolateds = set(isolated_format.walk_includes(self.root))
1520 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001521
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001522 # Extract 'command' and other bundle properties.
1523 for node in isolated_format.walk_includes(self.root):
1524 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001525 self.relative_cwd = self.relative_cwd or ''
1526
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001527 def _start_fetching_files(self, isolated, fetch_queue):
1528 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001529
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001530 Modifies self.files.
1531 """
maruel10bea7b2016-12-07 05:03:49 -08001532 files = isolated.data.get('files', {})
1533 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1534 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001535 # Root isolated has priority on the files being mapped. In particular,
1536 # overridden files must not be fetched.
1537 if filepath not in self.files:
1538 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001539
1540 # Make sure if the isolated is read only, the mode doesn't have write
1541 # bits.
1542 if 'm' in properties and self.read_only:
1543 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1544
1545 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001546 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001547 fetch_queue.add(
1548 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1549
1550 def _update_self(self, node):
1551 """Extracts bundle global parameters from loaded *.isolated file.
1552
1553 Will be called with each loaded *.isolated file in order of traversal of
1554 isolated include graph (see isolated_format.walk_includes).
1555 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001556 # Grabs properties.
1557 if not self.command and node.data.get('command'):
1558 # Ensure paths are correctly separated on windows.
1559 self.command = node.data['command']
1560 if self.command:
1561 self.command[0] = self.command[0].replace('/', os.path.sep)
1562 self.command = tools.fix_python_path(self.command)
1563 if self.read_only is None and node.data.get('read_only') is not None:
1564 self.read_only = node.data['read_only']
1565 if (self.relative_cwd is None and
1566 node.data.get('relative_cwd') is not None):
1567 self.relative_cwd = node.data['relative_cwd']
1568
1569
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001570def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001571 """Returns Storage class that can upload and download from |namespace|.
1572
1573 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001574 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001575 namespace: isolate namespace to operate in, also defines hashing and
1576 compression scheme used, i.e. namespace names that end with '-gzip'
1577 store compressed data.
1578
1579 Returns:
1580 Instance of Storage.
1581 """
aludwin81178302016-11-30 17:18:49 -08001582 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001583
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001584
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001585def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001586 """Uploads the given tree to the given url.
1587
1588 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001589 base_url: The url of the isolate server to upload to.
1590 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001591 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001592 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001593 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001594 # Filter out symlinks, since they are not represented by items on isolate
1595 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001596 items = []
1597 seen = set()
1598 skipped = 0
1599 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001600 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001601 if 'l' not in metadata and filepath not in seen:
1602 seen.add(filepath)
1603 item = FileItem(
1604 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001605 digest=metadata['h'],
1606 size=metadata['s'],
1607 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001608 items.append(item)
1609 else:
1610 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001611
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001612 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001613 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001614 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001615
1616
maruel4409e302016-07-19 14:25:51 -07001617def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001618 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001619
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001620 Arguments:
1621 isolated_hash: hash of the root *.isolated file.
1622 storage: Storage class that communicates with isolate storage.
1623 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001624 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001625 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001626
1627 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001628 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001629 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001630 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001631 'fetch_isolated(%s, %s, %s, %s, %s)',
1632 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001633 # Hash algorithm to use, defined by namespace |storage| is using.
1634 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001635 with cache:
1636 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001637 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001638
1639 with tools.Profiler('GetIsolateds'):
1640 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001641 if not isolated_format.is_valid_hash(isolated_hash, algo):
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001642 logging.debug('%s is not a valid hash, assuming a file '
1643 '(algo was %s, hash size was %d)',
1644 isolated_hash, algo(), algo().digest_size)
maruel1ceb3872015-10-14 06:10:44 -07001645 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001646 try:
maruel1ceb3872015-10-14 06:10:44 -07001647 isolated_hash = fetch_queue.inject_local_file(path, algo)
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001648 except IOError as e:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001649 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001650 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001651 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001652
1653 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001654 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001655
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001656 with tools.Profiler('GetRest'):
1657 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001658 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001659 create_directories(outdir, bundle.files)
1660 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001661
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001662 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001663 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001664 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001665
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001666 # Multimap: digest -> list of pairs (path, props).
1667 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001668 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001669 if 'h' in props:
1670 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001671
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001672 # Now block on the remaining files to be downloaded and mapped.
1673 logging.info('Retrieving remaining files (%d of them)...',
1674 fetch_queue.pending_count)
1675 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001676 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001677 while remaining:
1678 detector.ping()
1679
1680 # Wait for any item to finish fetching to cache.
1681 digest = fetch_queue.wait(remaining)
1682
tansell9e04a8d2016-07-28 09:31:59 -07001683 # Create the files in the destination using item in cache as the
1684 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001685 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001686 fullpath = os.path.join(outdir, filepath)
1687
1688 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001689 filetype = props.get('t', 'basic')
1690
1691 if filetype == 'basic':
1692 file_mode = props.get('m')
1693 if file_mode:
1694 # Ignore all bits apart from the user
1695 file_mode &= 0700
1696 putfile(
1697 srcfileobj, fullpath, file_mode,
1698 use_symlink=use_symlinks)
1699
tansell26de79e2016-11-13 18:41:11 -08001700 elif filetype == 'tar':
1701 basedir = os.path.dirname(fullpath)
Marc-Antoine Ruelffd80132017-12-04 16:00:02 -05001702 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1703 for ti in t:
tansell26de79e2016-11-13 18:41:11 -08001704 if not ti.isfile():
1705 logging.warning(
1706 'Path(%r) is nonfile (%s), skipped',
1707 ti.name, ti.type)
1708 continue
1709 fp = os.path.normpath(os.path.join(basedir, ti.name))
1710 if not fp.startswith(basedir):
1711 logging.error(
1712 'Path(%r) is outside root directory',
1713 fp)
Marc-Antoine Ruelffd80132017-12-04 16:00:02 -05001714 ifd = t.extractfile(ti)
tansell26de79e2016-11-13 18:41:11 -08001715 file_path.ensure_tree(os.path.dirname(fp))
1716 putfile(ifd, fp, 0700, ti.size)
1717
tanselle4288c32016-07-28 09:45:40 -07001718 else:
1719 raise isolated_format.IsolatedError(
1720 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001721
1722 # Report progress.
1723 duration = time.time() - last_update
1724 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1725 msg = '%d files remaining...' % len(remaining)
1726 print msg
1727 logging.info(msg)
1728 last_update = time.time()
1729
1730 # Cache could evict some items we just tried to fetch, it's a fatal error.
1731 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001732 raise isolated_format.MappingError(
1733 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001734 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001735
1736
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001737def directory_to_metadata(root, algo, blacklist):
1738 """Returns the FileItem list and .isolated metadata for a directory."""
1739 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001740 paths = isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruel7a68f712017-12-01 18:45:18 -05001741 root, u'.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001742 metadata = {
1743 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001744 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001745 for relpath in paths
1746 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001747 for v in metadata.itervalues():
1748 v.pop('t')
1749 items = [
1750 FileItem(
1751 path=os.path.join(root, relpath),
1752 digest=meta['h'],
1753 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001754 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001755 for relpath, meta in metadata.iteritems() if 'h' in meta
1756 ]
1757 return items, metadata
1758
1759
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001760def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001761 """Stores every entries and returns the relevant data.
1762
1763 Arguments:
1764 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001765 files: list of file paths to upload. If a directory is specified, a
1766 .isolated file is created and its hash is returned.
1767 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001768
1769 Returns:
1770 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1771 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001772 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001773 assert all(isinstance(i, unicode) for i in files), files
1774 if len(files) != len(set(map(os.path.abspath, files))):
1775 raise Error('Duplicate entries found.')
1776
maruel064c0a32016-04-05 11:47:15 -07001777 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001778 results = []
1779 # The temporary directory is only created as needed.
1780 tempdir = None
1781 try:
1782 # TODO(maruel): Yield the files to a worker thread.
1783 items_to_upload = []
1784 for f in files:
1785 try:
1786 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001787 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001788 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001789 items, metadata = directory_to_metadata(
1790 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001791
1792 # Create the .isolated file.
1793 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001794 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1795 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001796 os.close(handle)
1797 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001798 'algo':
1799 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001800 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001801 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001802 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001803 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001804 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001805 items_to_upload.extend(items)
1806 items_to_upload.append(
1807 FileItem(
1808 path=isolated,
1809 digest=h,
maruel12e30012015-10-09 11:55:35 -07001810 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001811 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001812 results.append((h, f))
1813
maruel12e30012015-10-09 11:55:35 -07001814 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001815 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001816 items_to_upload.append(
1817 FileItem(
1818 path=filepath,
1819 digest=h,
maruel12e30012015-10-09 11:55:35 -07001820 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001821 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001822 results.append((h, f))
1823 else:
1824 raise Error('%s is neither a file or directory.' % f)
1825 except OSError:
1826 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001827 uploaded = storage.upload_items(items_to_upload)
1828 cold = [i for i in items_to_upload if i in uploaded]
1829 hot = [i for i in items_to_upload if i not in uploaded]
1830 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001831 finally:
maruel12e30012015-10-09 11:55:35 -07001832 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001833 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001834
1835
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001836def archive(out, namespace, files, blacklist):
1837 if files == ['-']:
1838 files = sys.stdin.readlines()
1839
1840 if not files:
1841 raise Error('Nothing to upload')
1842
1843 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001844 blacklist = tools.gen_blacklist(blacklist)
1845 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001846 # Ignore stats.
1847 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001848 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1849
1850
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001851@subcommand.usage('<file1..fileN> or - to read from stdin')
1852def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001853 """Archives data to the server.
1854
1855 If a directory is specified, a .isolated file is created the whole directory
1856 is uploaded. Then this .isolated file can be included in another one to run
1857 commands.
1858
1859 The commands output each file that was processed with its content hash. For
1860 directories, the .isolated generated for the directory is listed as the
1861 directory entry itself.
1862 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001863 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001864 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001865 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001866 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001867 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001868 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001869 except Error as e:
1870 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001871 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001872
1873
1874def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001875 """Download data from the server.
1876
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001877 It can either download individual files or a complete tree from a .isolated
1878 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001879 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001880 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001881 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001882 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001883 help='hash of an isolated file, .isolated file content is discarded, use '
1884 '--file if you need it')
1885 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001886 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1887 help='hash and destination of a file, can be used multiple times')
1888 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001889 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001890 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001891 parser.add_option(
1892 '--use-symlinks', action='store_true',
1893 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001894 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001895 options, args = parser.parse_args(args)
1896 if args:
1897 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001898 if not file_path.enable_symlink():
1899 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001900
nodir55be77b2016-05-03 09:39:57 -07001901 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001902 if bool(options.isolated) == bool(options.file):
1903 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001904 if not options.cache and options.use_symlinks:
1905 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001906
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001907 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001908 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001909 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001910 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001911 if (fs.isfile(options.target) or
1912 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001913 parser.error(
1914 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001915 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001916 # Fetching individual files.
1917 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001918 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001919 channel = threading_utils.TaskChannel()
1920 pending = {}
1921 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001922 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001923 pending[digest] = dest
1924 storage.async_fetch(
1925 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001926 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001927 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001928 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001929 functools.partial(file_write, os.path.join(options.target, dest)))
1930 while pending:
1931 fetched = channel.pull()
1932 dest = pending.pop(fetched)
1933 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001934
Vadim Shtayura3172be52013-12-03 12:49:05 -08001935 # Fetching whole isolated tree.
1936 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001937 with cache:
1938 bundle = fetch_isolated(
1939 isolated_hash=options.isolated,
1940 storage=storage,
1941 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001942 outdir=options.target,
1943 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001944 if bundle.command:
1945 rel = os.path.join(options.target, bundle.relative_cwd)
1946 print('To run this test please run from the directory %s:' %
1947 os.path.join(options.target, rel))
1948 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001949
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001950 return 0
1951
1952
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001953def add_archive_options(parser):
1954 parser.add_option(
1955 '--blacklist',
1956 action='append', default=list(DEFAULT_BLACKLIST),
1957 help='List of regexp to use as blacklist filter when uploading '
1958 'directories')
1959
1960
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001961def add_isolate_server_options(parser):
1962 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001963 parser.add_option(
1964 '-I', '--isolate-server',
1965 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001966 help='URL of the Isolate Server to use. Defaults to the environment '
1967 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1968 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001969 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001970 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001971 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001972 '--namespace', default='default-gzip',
1973 help='The namespace to use on the Isolate Server, default: %default')
1974
1975
nodir55be77b2016-05-03 09:39:57 -07001976def process_isolate_server_options(
1977 parser, options, set_exception_handler, required):
1978 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001979
1980 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001981 """
1982 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001983 if required:
1984 parser.error('--isolate-server is required.')
1985 return
1986
aludwind7b7b7e2017-06-29 16:38:50 -07001987 if options.grpc_proxy:
1988 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001989 else:
1990 try:
1991 options.isolate_server = net.fix_url(options.isolate_server)
1992 except ValueError as e:
1993 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001994 if set_exception_handler:
1995 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001996 try:
1997 return auth.ensure_logged_in(options.isolate_server)
1998 except ValueError as e:
1999 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002000
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002001
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002002def add_cache_options(parser):
2003 cache_group = optparse.OptionGroup(parser, 'Cache management')
2004 cache_group.add_option(
2005 '--cache', metavar='DIR',
2006 help='Directory to keep a local cache of the files. Accelerates download '
2007 'by reusing already downloaded files. Default=%default')
2008 cache_group.add_option(
2009 '--max-cache-size',
2010 type='int',
2011 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002012 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002013 help='Trim if the cache gets larger than this value, default=%default')
2014 cache_group.add_option(
2015 '--min-free-space',
2016 type='int',
2017 metavar='NNN',
2018 default=2*1024*1024*1024,
2019 help='Trim if disk free space becomes lower than this value, '
2020 'default=%default')
2021 cache_group.add_option(
2022 '--max-items',
2023 type='int',
2024 metavar='NNN',
2025 default=100000,
2026 help='Trim if more than this number of items are in the cache '
2027 'default=%default')
2028 parser.add_option_group(cache_group)
2029
2030
maruele6fc9382017-05-04 09:03:48 -07002031def process_cache_options(options, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002032 if options.cache:
2033 policies = CachePolicies(
2034 options.max_cache_size, options.min_free_space, options.max_items)
2035
2036 # |options.cache| path may not exist until DiskCache() instance is created.
2037 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002038 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002039 policies,
nodirf33b8d62016-10-26 22:34:58 -07002040 isolated_format.get_hash_algo(options.namespace),
maruele6fc9382017-05-04 09:03:48 -07002041 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002042 else:
2043 return MemoryCache()
2044
2045
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002046class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002047 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002048 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002049 self,
2050 version=__version__,
2051 prog=os.path.basename(sys.modules[__name__].__file__),
2052 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002053 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002054
2055 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002056 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002057 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002058 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002059 return options, args
2060
2061
2062def main(args):
2063 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002064 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002065
2066
2067if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002068 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002069 fix_encoding.fix_encoding()
2070 tools.disable_buffering()
2071 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002072 sys.exit(main(sys.argv[1:]))