blob: b9cd9e0010b8a4d0f955efd25e445b9e57c13a71 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04008__version__ = '0.8.1'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
tanselle4288c32016-07-28 09:45:40 -070029from libs import arfile
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070036from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000037from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000038from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080040import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040041import isolated_format
aludwin81178302016-11-30 17:18:49 -080042import isolate_storage
43from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# The file size to be used when we don't know the correct file size,
51# generally used for .isolated files.
52UNKNOWN_FILE_SIZE = None
53
54
55# Maximum expected delay (in seconds) between successive file fetches or uploads
56# in Storage. If it takes longer than that, a deadlock might be happening
57# and all stack frames for all threads are dumped to log.
58DEADLOCK_TIMEOUT = 5 * 60
59
60
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000061# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000062# All files are sorted by likelihood of a change in the file content
63# (currently file size is used to estimate this: larger the file -> larger the
64# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000065# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000066# and so on. Numbers here is a trade-off; the more per request, the lower the
67# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
68# larger values cause longer lookups, increasing the initial latency to start
69# uploading, which is especially an issue for large files. This value is
70# optimized for the "few thousands files to look up with minimal number of large
71# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040072ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000073
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000074
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075# A list of already compressed extension types that should not receive any
76# compression before being uploaded.
77ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040078 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
79 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000080]
81
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000082
maruel@chromium.org41601642013-09-18 19:40:46 +000083# The delay (in seconds) to wait between logging statements when retrieving
84# the required files. This is intended to let the user (or buildbot) know that
85# the program is still running.
86DELAY_BETWEEN_UPDATES_IN_SECS = 30
87
88
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050089DEFAULT_BLACKLIST = (
90 # Temporary vim or python files.
91 r'^.+\.(?:pyc|swp)$',
92 # .git or .svn directory.
93 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
94)
95
96
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050097class Error(Exception):
98 """Generic runtime error."""
99 pass
100
101
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400102class Aborted(Error):
103 """Operation aborted."""
104 pass
105
106
nodir90bc8dc2016-06-15 13:35:21 -0700107class AlreadyExists(Error):
108 """File already exists."""
109
110
maruel12e30012015-10-09 11:55:35 -0700111def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800112 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700113 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800114 if offset:
115 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000116 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000117 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000118 if not data:
119 break
120 yield data
121
122
maruel12e30012015-10-09 11:55:35 -0700123def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000124 """Writes file content as generated by content_generator.
125
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000126 Creates the intermediary directory as needed.
127
128 Returns the number of bytes written.
129
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000130 Meant to be mocked out in unit tests.
131 """
nodire5028a92016-04-29 14:38:21 -0700132 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 total = 0
maruel12e30012015-10-09 11:55:35 -0700134 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000135 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000136 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000138 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000139
140
tansell9e04a8d2016-07-28 09:31:59 -0700141def fileobj_path(fileobj):
142 """Return file system path for file like object or None.
143
144 The returned path is guaranteed to exist and can be passed to file system
145 operations like copy.
146 """
147 name = getattr(fileobj, 'name', None)
148 if name is None:
149 return
150
151 # If the file like object was created using something like open("test.txt")
152 # name will end up being a str (such as a function outside our control, like
153 # the standard library). We want all our paths to be unicode objects, so we
154 # decode it.
155 if not isinstance(name, unicode):
156 name = name.decode(sys.getfilesystemencoding())
157
tansell26de79e2016-11-13 18:41:11 -0800158 # fs.exists requires an absolute path, otherwise it will fail with an
159 # assertion error.
160 if not os.path.isabs(name):
161 return
162
tansell9e04a8d2016-07-28 09:31:59 -0700163 if fs.exists(name):
164 return name
165
166
167# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
168# wrappers have been created.
169def fileobj_copy(
170 dstfileobj, srcfileobj, size=-1,
171 chunk_size=isolated_format.DISK_FILE_CHUNK):
172 """Copy data from srcfileobj to dstfileobj.
173
174 Providing size means exactly that amount of data will be copied (if there
175 isn't enough data, an IOError exception is thrown). Otherwise all data until
176 the EOF marker will be copied.
177 """
178 if size == -1 and hasattr(srcfileobj, 'tell'):
179 if srcfileobj.tell() != 0:
180 raise IOError('partial file but not using size')
181
182 written = 0
183 while written != size:
184 readsize = chunk_size
185 if size > 0:
186 readsize = min(readsize, size-written)
187 data = srcfileobj.read(readsize)
188 if not data:
189 if size == -1:
190 break
191 raise IOError('partial file, got %s, wanted %s' % (written, size))
192 dstfileobj.write(data)
193 written += len(data)
194
195
196def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
197 """Put srcfileobj at the given dstpath with given mode.
198
199 The function aims to do this as efficiently as possible while still allowing
200 any possible file like object be given.
201
202 Creating a tree of hardlinks has a few drawbacks:
203 - tmpfs cannot be used for the scratch space. The tree has to be on the same
204 partition as the cache.
205 - involves a write to the inode, which advances ctime, cause a metadata
206 writeback (causing disk seeking).
207 - cache ctime cannot be used to detect modifications / corruption.
208 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
209 partition. This is why the function automatically fallbacks to copying the
210 file content.
211 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
212 same owner is for all hardlinks.
213 - Anecdotal report that ext2 is known to be potentially faulty on high rate
214 of hardlink creation.
215
216 Creating a tree of symlinks has a few drawbacks:
217 - Tasks running the equivalent of os.path.realpath() will get the naked path
218 and may fail.
219 - Windows:
220 - Symlinks are reparse points:
221 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
222 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
223 - Symbolic links are Win32 paths, not NT paths.
224 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
225 - Symbolic links are supported on Windows 7 and later only.
226 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
227 default.
228 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
229 RID is present in the token;
230 https://msdn.microsoft.com/en-us/library/bb530410.aspx
231 """
232 srcpath = fileobj_path(srcfileobj)
233 if srcpath and size == -1:
234 readonly = file_mode is None or (
235 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
236
237 if readonly:
238 # If the file is read only we can link the file
239 if use_symlink:
240 link_mode = file_path.SYMLINK_WITH_FALLBACK
241 else:
242 link_mode = file_path.HARDLINK_WITH_FALLBACK
243 else:
244 # If not read only, we must copy the file
245 link_mode = file_path.COPY
246
247 file_path.link_file(dstpath, srcpath, link_mode)
248 else:
249 # Need to write out the file
250 with fs.open(dstpath, 'wb') as dstfileobj:
251 fileobj_copy(dstfileobj, srcfileobj, size)
252
253 assert fs.exists(dstpath)
254
255 # file_mode of 0 is actually valid, so need explicit check.
256 if file_mode is not None:
257 fs.chmod(dstpath, file_mode)
258
259
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000260def zip_compress(content_generator, level=7):
261 """Reads chunks from |content_generator| and yields zip compressed chunks."""
262 compressor = zlib.compressobj(level)
263 for chunk in content_generator:
264 compressed = compressor.compress(chunk)
265 if compressed:
266 yield compressed
267 tail = compressor.flush(zlib.Z_FINISH)
268 if tail:
269 yield tail
270
271
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400272def zip_decompress(
273 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000274 """Reads zipped data from |content_generator| and yields decompressed data.
275
276 Decompresses data in small chunks (no larger than |chunk_size|) so that
277 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
278
279 Raises IOError if data is corrupted or incomplete.
280 """
281 decompressor = zlib.decompressobj()
282 compressed_size = 0
283 try:
284 for chunk in content_generator:
285 compressed_size += len(chunk)
286 data = decompressor.decompress(chunk, chunk_size)
287 if data:
288 yield data
289 while decompressor.unconsumed_tail:
290 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
291 if data:
292 yield data
293 tail = decompressor.flush()
294 if tail:
295 yield tail
296 except zlib.error as e:
297 raise IOError(
298 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
299 # Ensure all data was read and decompressed.
300 if decompressor.unused_data or decompressor.unconsumed_tail:
301 raise IOError('Not all data was decompressed')
302
303
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000304def get_zip_compression_level(filename):
305 """Given a filename calculates the ideal zip compression level to use."""
306 file_ext = os.path.splitext(filename)[1].lower()
307 # TODO(csharp): Profile to find what compression level works best.
308 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
309
310
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000311def create_directories(base_directory, files):
312 """Creates the directory structure needed by the given list of files."""
313 logging.debug('create_directories(%s, %d)', base_directory, len(files))
314 # Creates the tree of directories to create.
315 directories = set(os.path.dirname(f) for f in files)
316 for item in list(directories):
317 while item:
318 directories.add(item)
319 item = os.path.dirname(item)
320 for d in sorted(directories):
321 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700322 abs_d = os.path.join(base_directory, d)
323 if not fs.isdir(abs_d):
324 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000325
326
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500327def create_symlinks(base_directory, files):
328 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000329 for filepath, properties in files:
330 if 'l' not in properties:
331 continue
332 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500333 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000334 logging.warning('Ignoring symlink %s', filepath)
335 continue
336 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700337 try:
338 os.symlink(properties['l'], outfile) # pylint: disable=E1101
339 except OSError as e:
340 if e.errno == errno.EEXIST:
341 raise AlreadyExists('File %s already exists.' % outfile)
342 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000343
344
maruel12e30012015-10-09 11:55:35 -0700345def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000346 """Determines if the given files appears valid.
347
vadimsh129e5942017-01-04 16:42:46 -0800348 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000349 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700350 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700351 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800352 try:
353 actual_size = fs.stat(path).st_size
354 except OSError as e:
355 logging.warning(
356 'Can\'t read item %s, assuming it\'s invalid: %s',
357 os.path.basename(path), e)
358 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000359 if size != actual_size:
360 logging.warning(
361 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700362 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000363 return False
364 return True
365
366
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000367class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800368 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000369
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 Its digest and size may be provided in advance, if known. Otherwise they will
371 be derived from the file content.
372 """
373
374 def __init__(self, path, digest=None, size=None, high_priority=False):
375 super(FileItem, self).__init__(
376 digest,
maruel12e30012015-10-09 11:55:35 -0700377 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800378 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000379 self.path = path
380 self.compression_level = get_zip_compression_level(path)
381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000386class BufferItem(Item):
387 """A byte buffer to push to Storage."""
388
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800389 def __init__(self, buf, high_priority=False):
390 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000391 self.buffer = buf
392
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800393 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000394 return [self.buffer]
395
396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800398 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000399
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800400 Implements compression support, parallel 'contains' checks, parallel uploads
401 and more.
402
403 Works only within single namespace (and thus hashing algorithm and compression
404 scheme are fixed).
405
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400406 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
407 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800408 """
409
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700410 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000411 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400412 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700413 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400414 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 self._cpu_thread_pool = None
416 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 self._aborted = False
418 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000419
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700421 def hash_algo(self):
422 """Hashing algorithm used to name files in storage based on their content.
423
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400424 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700425 """
426 return self._hash_algo
427
428 @property
429 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500430 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700431 return self._storage_api.location
432
433 @property
434 def namespace(self):
435 """Isolate namespace used by this storage.
436
437 Indirectly defines hashing scheme and compression method used.
438 """
439 return self._storage_api.namespace
440
441 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000442 def cpu_thread_pool(self):
443 """ThreadPool for CPU-bound tasks like zipping."""
444 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500445 threads = max(threading_utils.num_processors(), 2)
446 if sys.maxsize <= 2L**32:
447 # On 32 bits userland, do not try to use more than 16 threads.
448 threads = min(threads, 16)
449 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000450 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 @property
453 def net_thread_pool(self):
454 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
455 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700456 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 def close(self):
460 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400461 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 if self._cpu_thread_pool:
463 self._cpu_thread_pool.join()
464 self._cpu_thread_pool.close()
465 self._cpu_thread_pool = None
466 if self._net_thread_pool:
467 self._net_thread_pool.join()
468 self._net_thread_pool.close()
469 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400470 logging.info('Done.')
471
472 def abort(self):
473 """Cancels any pending or future operations."""
474 # This is not strictly theadsafe, but in the worst case the logging message
475 # will be printed twice. Not a big deal. In other places it is assumed that
476 # unprotected reads and writes to _aborted are serializable (it is true
477 # for python) and thus no locking is used.
478 if not self._aborted:
479 logging.warning('Aborting... It can take a while.')
480 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000481
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 def __enter__(self):
483 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400484 assert not self._prev_sig_handlers, self._prev_sig_handlers
485 for s in (signal.SIGINT, signal.SIGTERM):
486 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 return self
488
489 def __exit__(self, _exc_type, _exc_value, _traceback):
490 """Context manager interface."""
491 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400492 while self._prev_sig_handlers:
493 s, h = self._prev_sig_handlers.popitem()
494 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 return False
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
503 items: list of Item instances that represents data to upload.
504
505 Returns:
506 List of items that were uploaded. All other items are already there.
507 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700508 logging.info('upload_items(items=%d)', len(items))
509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 # Ensure all digests are calculated.
511 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700512 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000514 # For each digest keep only first Item that matches it. All other items
515 # are just indistinguishable copies from the point of view of isolate
516 # server (it doesn't care about paths at all, only content and digests).
517 seen = {}
518 duplicates = 0
519 for item in items:
520 if seen.setdefault(item.digest, item) is not item:
521 duplicates += 1
522 items = seen.values()
523 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700524 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000525
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 channel = threading_utils.TaskChannel()
530 for missing_item, push_state in self.get_missing_items(items):
531 missing.add(missing_item)
532 self.async_push(channel, missing_item, push_state)
533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 # No need to spawn deadlock detector thread if there's nothing to upload.
535 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700536 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 detector.ping()
540 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 logging.info('All files are uploaded')
545
546 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 total = len(items)
548 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 logging.info(
550 'Total: %6d, %9.1fkb',
551 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000552 total_size / 1024.)
553 cache_hit = set(items) - missing
554 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 logging.info(
556 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
557 len(cache_hit),
558 cache_hit_size / 1024.,
559 len(cache_hit) * 100. / total,
560 cache_hit_size * 100. / total_size if total_size else 0)
561 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 logging.info(
564 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
565 len(cache_miss),
566 cache_miss_size / 1024.,
567 len(cache_miss) * 100. / total,
568 cache_miss_size * 100. / total_size if total_size else 0)
569
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 return uploaded
571
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000573 """Starts asynchronous push to the server in a parallel thread.
574
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 Can be used only after |item| was checked for presence on a server with
576 'get_missing_items' call. 'get_missing_items' returns |push_state| object
577 that contains storage specific information describing how to upload
578 the item (for example in case of cloud storage, it is signed upload URLs).
579
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000581 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000582 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800583 push_state: push state returned by 'get_missing_items' call for |item|.
584
585 Returns:
586 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800588 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400589 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700590 threading_utils.PRIORITY_HIGH if item.high_priority
591 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800592
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000593 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400594 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400595 if self._aborted:
596 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800598 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 return item
600
601 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700602 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800603 self.net_thread_pool.add_task_with_channel(
604 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 return
606
607 # If zipping is enabled, zip in a separate thread.
608 def zip_and_push():
609 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
610 # content right here. It will block until all file is zipped.
611 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400612 if self._aborted:
613 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 data = ''.join(stream)
616 except Exception as exc:
617 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800618 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000620 self.net_thread_pool.add_task_with_channel(
621 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000622 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624 def push(self, item, push_state):
625 """Synchronously pushes a single item to the server.
626
627 If you need to push many items at once, consider using 'upload_items' or
628 'async_push' with instance of TaskChannel.
629
630 Arguments:
631 item: item to upload as instance of Item class.
632 push_state: push state returned by 'get_missing_items' call for |item|.
633
634 Returns:
635 Pushed item (same object as |item|).
636 """
637 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700638 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639 self.async_push(channel, item, push_state)
640 pushed = channel.pull()
641 assert pushed is item
642 return item
643
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000644 def async_fetch(self, channel, priority, digest, size, sink):
645 """Starts asynchronous fetch from the server in a parallel thread.
646
647 Arguments:
648 channel: TaskChannel that receives back |digest| when download ends.
649 priority: thread pool task priority for the fetch.
650 digest: hex digest of an item to download.
651 size: expected size of the item (after decompression).
652 sink: function that will be called as sink(generator).
653 """
654 def fetch():
655 try:
656 # Prepare reading pipeline.
657 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700658 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400659 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000660 # Run |stream| through verifier that will assert its size.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400661 verifier = FetchStreamVerifier(stream, self._hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000662 # Verified stream goes to |sink|.
663 sink(verifier.run())
664 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800665 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000666 raise
667 return digest
668
669 # Don't bother with zip_thread_pool for decompression. Decompression is
670 # really fast and most probably IO bound anyway.
671 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
672
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000673 def get_missing_items(self, items):
674 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000675
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000676 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000677
678 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000679 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800682 For each missing item it yields a pair (item, push_state), where:
683 * item - Item object that is missing (one of |items|).
684 * push_state - opaque object that contains storage specific information
685 describing how to upload the item (for example in case of cloud
686 storage, it is signed upload URLs). It can later be passed to
687 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000688 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000689 channel = threading_utils.TaskChannel()
690 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800691
692 # Ensure all digests are calculated.
693 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700694 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800695
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400696 def contains(batch):
697 if self._aborted:
698 raise Aborted()
699 return self._storage_api.contains(batch)
700
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000701 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800702 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400703 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400704 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000705 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800706
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000707 # Yield results as they come in.
708 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800709 for missing_item, push_state in channel.pull().iteritems():
710 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000711
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000712
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800713def batch_items_for_check(items):
714 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000715
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800716 Each batch corresponds to a single 'exists?' query to the server via a call
717 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000718
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800719 Arguments:
720 items: a list of Item objects.
721
722 Yields:
723 Batches of items to query for existence in a single operation,
724 each batch is a list of Item objects.
725 """
726 batch_count = 0
727 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
728 next_queries = []
729 for item in sorted(items, key=lambda x: x.size, reverse=True):
730 next_queries.append(item)
731 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000732 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800733 next_queries = []
734 batch_count += 1
735 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
736 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
737 if next_queries:
738 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000739
740
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000741class FetchQueue(object):
742 """Fetches items from Storage and places them into LocalCache.
743
744 It manages multiple concurrent fetch operations. Acts as a bridge between
745 Storage and LocalCache so that Storage and LocalCache don't depend on each
746 other at all.
747 """
748
749 def __init__(self, storage, cache):
750 self.storage = storage
751 self.cache = cache
752 self._channel = threading_utils.TaskChannel()
753 self._pending = set()
754 self._accessed = set()
755 self._fetched = cache.cached_set()
756
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400757 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700758 self,
759 digest,
760 size=UNKNOWN_FILE_SIZE,
761 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000762 """Starts asynchronous fetch of item |digest|."""
763 # Fetching it now?
764 if digest in self._pending:
765 return
766
767 # Mark this file as in use, verify_all_cached will later ensure it is still
768 # in cache.
769 self._accessed.add(digest)
770
771 # Already fetched? Notify cache to update item's LRU position.
772 if digest in self._fetched:
773 # 'touch' returns True if item is in cache and not corrupted.
774 if self.cache.touch(digest, size):
775 return
776 # Item is corrupted, remove it from cache and fetch it again.
777 self._fetched.remove(digest)
778 self.cache.evict(digest)
779
780 # TODO(maruel): It should look at the free disk space, the current cache
781 # size and the size of the new item on every new item:
782 # - Trim the cache as more entries are listed when free disk space is low,
783 # otherwise if the amount of data downloaded during the run > free disk
784 # space, it'll crash.
785 # - Make sure there's enough free disk space to fit all dependencies of
786 # this run! If not, abort early.
787
788 # Start fetching.
789 self._pending.add(digest)
790 self.storage.async_fetch(
791 self._channel, priority, digest, size,
792 functools.partial(self.cache.write, digest))
793
794 def wait(self, digests):
795 """Starts a loop that waits for at least one of |digests| to be retrieved.
796
797 Returns the first digest retrieved.
798 """
799 # Flush any already fetched items.
800 for digest in digests:
801 if digest in self._fetched:
802 return digest
803
804 # Ensure all requested items are being fetched now.
805 assert all(digest in self._pending for digest in digests), (
806 digests, self._pending)
807
808 # Wait for some requested item to finish fetching.
809 while self._pending:
810 digest = self._channel.pull()
811 self._pending.remove(digest)
812 self._fetched.add(digest)
813 if digest in digests:
814 return digest
815
816 # Should never reach this point due to assert above.
817 raise RuntimeError('Impossible state')
818
819 def inject_local_file(self, path, algo):
820 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700821 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 data = f.read()
823 digest = algo(data).hexdigest()
824 self.cache.write(digest, [data])
825 self._fetched.add(digest)
826 return digest
827
828 @property
829 def pending_count(self):
830 """Returns number of items to be fetched."""
831 return len(self._pending)
832
833 def verify_all_cached(self):
834 """True if all accessed items are in cache."""
835 return self._accessed.issubset(self.cache.cached_set())
836
837
838class FetchStreamVerifier(object):
839 """Verifies that fetched file is valid before passing it to the LocalCache."""
840
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400841 def __init__(self, stream, hasher, expected_digest, expected_size):
842 """Initializes the verifier.
843
844 Arguments:
845 * stream: an iterable yielding chunks of content
846 * hasher: an object from hashlib that supports update() and hexdigest()
847 (eg, hashlib.sha1).
848 * expected_digest: if the entire stream is piped through hasher and then
849 summarized via hexdigest(), this should be the result. That is, it
850 should be a hex string like 'abc123'.
851 * expected_size: either the expected size of the stream, or
852 UNKNOWN_FILE_SIZE.
853 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400854 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000855 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400856 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000857 self.expected_size = expected_size
858 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400859 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000860
861 def run(self):
862 """Generator that yields same items as |stream|.
863
864 Verifies |stream| is complete before yielding a last chunk to consumer.
865
866 Also wraps IOError produced by consumer into MappingError exceptions since
867 otherwise Storage will retry fetch on unrelated local cache errors.
868 """
869 # Read one chunk ahead, keep it in |stored|.
870 # That way a complete stream can be verified before pushing last chunk
871 # to consumer.
872 stored = None
873 for chunk in self.stream:
874 assert chunk is not None
875 if stored is not None:
876 self._inspect_chunk(stored, is_last=False)
877 try:
878 yield stored
879 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400880 raise isolated_format.MappingError(
881 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000882 stored = chunk
883 if stored is not None:
884 self._inspect_chunk(stored, is_last=True)
885 try:
886 yield stored
887 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400888 raise isolated_format.MappingError(
889 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000890
891 def _inspect_chunk(self, chunk, is_last):
892 """Called for each fetched chunk before passing it to consumer."""
893 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400894 self.rolling_hash.update(chunk)
895 if not is_last:
896 return
897
898 if ((self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000899 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400900 msg = 'Incorrect file size: want %d, got %d' % (
901 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400902 raise IOError(msg)
903
904 actual_digest = self.rolling_hash.hexdigest()
905 if self.expected_digest != actual_digest:
906 msg = 'Incorrect digest: want %s, got %s' % (
907 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400908 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000909
910
nodir445097b2016-06-03 22:50:26 -0700911class CacheMiss(Exception):
912 """Raised when an item is not in cache."""
913
914 def __init__(self, digest):
915 self.digest = digest
916 super(CacheMiss, self).__init__(
917 'Item with digest %r is not found in cache' % digest)
918
919
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000920class LocalCache(object):
921 """Local cache that stores objects fetched via Storage.
922
923 It can be accessed concurrently from multiple threads, so it should protect
924 its internal state with some lock.
925 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500926 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000927
maruel064c0a32016-04-05 11:47:15 -0700928 def __init__(self):
929 self._lock = threading_utils.LockWithAssert()
930 # Profiling values.
931 self._added = []
932 self._initial_number_items = 0
933 self._initial_size = 0
934 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700935 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700936
nodirbe642ff2016-06-09 15:51:51 -0700937 def __contains__(self, digest):
938 raise NotImplementedError()
939
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000940 def __enter__(self):
941 """Context manager interface."""
942 return self
943
944 def __exit__(self, _exc_type, _exec_value, _traceback):
945 """Context manager interface."""
946 return False
947
maruel064c0a32016-04-05 11:47:15 -0700948 @property
949 def added(self):
950 return self._added[:]
951
952 @property
953 def evicted(self):
954 return self._evicted[:]
955
956 @property
tansell9e04a8d2016-07-28 09:31:59 -0700957 def used(self):
958 return self._used[:]
959
960 @property
maruel064c0a32016-04-05 11:47:15 -0700961 def initial_number_items(self):
962 return self._initial_number_items
963
964 @property
965 def initial_size(self):
966 return self._initial_size
967
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000968 def cached_set(self):
969 """Returns a set of all cached digests (always a new object)."""
970 raise NotImplementedError()
971
maruel36a963d2016-04-08 17:15:49 -0700972 def cleanup(self):
973 """Deletes any corrupted item from the cache and trims it if necessary."""
974 raise NotImplementedError()
975
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000976 def touch(self, digest, size):
977 """Ensures item is not corrupted and updates its LRU position.
978
979 Arguments:
980 digest: hash digest of item to check.
981 size: expected size of this item.
982
983 Returns:
984 True if item is in cache and not corrupted.
985 """
986 raise NotImplementedError()
987
988 def evict(self, digest):
989 """Removes item from cache if it's there."""
990 raise NotImplementedError()
991
tansell9e04a8d2016-07-28 09:31:59 -0700992 def getfileobj(self, digest):
993 """Returns a readable file like object.
994
995 If file exists on the file system it will have a .name attribute with an
996 absolute path to the file.
997 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000998 raise NotImplementedError()
999
1000 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001001 """Reads data from |content| generator and stores it in cache.
1002
1003 Returns digest to simplify chaining.
1004 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001005 raise NotImplementedError()
1006
maruele6fc9382017-05-04 09:03:48 -07001007 def trim(self):
1008 """Enforces cache policies.
1009
1010 Returns:
1011 Number of items evicted.
1012 """
1013 raise NotImplementedError()
1014
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015
1016class MemoryCache(LocalCache):
1017 """LocalCache implementation that stores everything in memory."""
1018
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001019 def __init__(self, file_mode_mask=0500):
1020 """Args:
1021 file_mode_mask: bit mask to AND file mode with. Default value will make
1022 all mapped files to be read only.
1023 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001024 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001025 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001026 self._contents = {}
1027
nodirbe642ff2016-06-09 15:51:51 -07001028 def __contains__(self, digest):
1029 with self._lock:
1030 return digest in self._contents
1031
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001032 def cached_set(self):
1033 with self._lock:
1034 return set(self._contents)
1035
maruel36a963d2016-04-08 17:15:49 -07001036 def cleanup(self):
1037 pass
1038
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001039 def touch(self, digest, size):
1040 with self._lock:
1041 return digest in self._contents
1042
1043 def evict(self, digest):
1044 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001045 v = self._contents.pop(digest, None)
1046 if v is not None:
1047 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001048
tansell9e04a8d2016-07-28 09:31:59 -07001049 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001050 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001051 try:
tansell9e04a8d2016-07-28 09:31:59 -07001052 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001053 except KeyError:
1054 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001055 self._used.append(len(d))
1056 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001057
1058 def write(self, digest, content):
1059 # Assemble whole stream before taking the lock.
1060 data = ''.join(content)
1061 with self._lock:
1062 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001063 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001064 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001065
maruele6fc9382017-05-04 09:03:48 -07001066 def trim(self):
1067 """Trimming is not implemented for MemoryCache."""
1068 return 0
1069
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001070
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001071class CachePolicies(object):
1072 def __init__(self, max_cache_size, min_free_space, max_items):
1073 """
1074 Arguments:
1075 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1076 cache is effectively a leak.
1077 - min_free_space: Trim if disk free space becomes lower than this value. If
1078 0, it unconditionally fill the disk.
1079 - max_items: Maximum number of items to keep in the cache. If 0, do not
1080 enforce a limit.
1081 """
1082 self.max_cache_size = max_cache_size
1083 self.min_free_space = min_free_space
1084 self.max_items = max_items
1085
1086
1087class DiskCache(LocalCache):
1088 """Stateful LRU cache in a flat hash table in a directory.
1089
1090 Saves its state as json file.
1091 """
maruel12e30012015-10-09 11:55:35 -07001092 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001093
maruele6fc9382017-05-04 09:03:48 -07001094 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001095 """
1096 Arguments:
1097 cache_dir: directory where to place the cache.
1098 policies: cache retention policies.
1099 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001100 trim: if True to enforce |policies| right away.
1101 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001102 """
maruel064c0a32016-04-05 11:47:15 -07001103 # All protected methods (starting with '_') except _path should be called
1104 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001105 super(DiskCache, self).__init__()
1106 self.cache_dir = cache_dir
1107 self.policies = policies
1108 self.hash_algo = hash_algo
1109 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001110 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001111 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001112 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001113 file_path.ensure_tree(self.cache_dir)
1114 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001115 # The first item in the LRU cache that must not be evicted during this run
1116 # since it was referenced. All items more recent that _protected in the LRU
1117 # cache are also inherently protected. It could be a set() of all items
1118 # referenced but this increases memory usage without a use case.
1119 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001120 # Cleanup operations done by self._load(), if any.
1121 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001122 with tools.Profiler('Setup'):
1123 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001124 self._load(trim, time_fn)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001125
nodirbe642ff2016-06-09 15:51:51 -07001126 def __contains__(self, digest):
1127 with self._lock:
1128 return digest in self._lru
1129
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001130 def __enter__(self):
1131 return self
1132
1133 def __exit__(self, _exc_type, _exec_value, _traceback):
1134 with tools.Profiler('CleanupTrimming'):
1135 with self._lock:
1136 self._trim()
1137
1138 logging.info(
1139 '%5d (%8dkb) added',
1140 len(self._added), sum(self._added) / 1024)
1141 logging.info(
1142 '%5d (%8dkb) current',
1143 len(self._lru),
1144 sum(self._lru.itervalues()) / 1024)
1145 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001146 '%5d (%8dkb) evicted',
1147 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001148 logging.info(
1149 ' %8dkb free',
1150 self._free_disk / 1024)
1151 return False
1152
1153 def cached_set(self):
1154 with self._lock:
1155 return self._lru.keys_set()
1156
maruel36a963d2016-04-08 17:15:49 -07001157 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001158 """Cleans up the cache directory.
1159
1160 Ensures there is no unknown files in cache_dir.
1161 Ensures the read-only bits are set correctly.
1162
1163 At that point, the cache was already loaded, trimmed to respect cache
1164 policies.
1165 """
1166 fs.chmod(self.cache_dir, 0700)
1167 # Ensure that all files listed in the state still exist and add new ones.
1168 previous = self._lru.keys_set()
1169 # It'd be faster if there were a readdir() function.
1170 for filename in fs.listdir(self.cache_dir):
1171 if filename == self.STATE_FILE:
1172 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1173 continue
1174 if filename in previous:
1175 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1176 previous.remove(filename)
1177 continue
1178
1179 # An untracked file. Delete it.
1180 logging.warning('Removing unknown file %s from cache', filename)
1181 p = self._path(filename)
1182 if fs.isdir(p):
1183 try:
1184 file_path.rmtree(p)
1185 except OSError:
1186 pass
1187 else:
1188 file_path.try_remove(p)
1189 continue
1190
1191 if previous:
1192 # Filter out entries that were not found.
1193 logging.warning('Removed %d lost files', len(previous))
1194 for filename in previous:
1195 self._lru.pop(filename)
maruele6fc9382017-05-04 09:03:48 -07001196 self._save()
maruel36a963d2016-04-08 17:15:49 -07001197
1198 # What remains to be done is to hash every single item to
1199 # detect corruption, then save to ensure state.json is up to date.
1200 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1201 # TODO(maruel): Let's revisit once directory metadata is stored in
1202 # state.json so only the files that had been mapped since the last cleanup()
1203 # call are manually verified.
1204 #
1205 #with self._lock:
1206 # for digest in self._lru:
1207 # if not isolated_format.is_valid_hash(
1208 # self._path(digest), self.hash_algo):
1209 # self.evict(digest)
1210 # logging.info('Deleted corrupted item: %s', digest)
1211
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001212 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001213 """Verifies an actual file is valid and bumps its LRU position.
1214
1215 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1216 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001217
1218 Note that is doesn't compute the hash so it could still be corrupted if the
1219 file size didn't change.
1220
1221 TODO(maruel): More stringent verification while keeping the check fast.
1222 """
1223 # Do the check outside the lock.
1224 if not is_valid_file(self._path(digest), size):
1225 return False
1226
1227 # Update it's LRU position.
1228 with self._lock:
1229 if digest not in self._lru:
1230 return False
1231 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001232 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001233 return True
1234
1235 def evict(self, digest):
1236 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001237 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001238 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001239 self._lru.pop(digest)
1240 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1241
tansell9e04a8d2016-07-28 09:31:59 -07001242 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001243 try:
tansell9e04a8d2016-07-28 09:31:59 -07001244 f = fs.open(self._path(digest), 'rb')
1245 with self._lock:
1246 self._used.append(self._lru[digest])
1247 return f
nodir445097b2016-06-03 22:50:26 -07001248 except IOError:
1249 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001250
1251 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001252 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001253 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001254 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001255 path = self._path(digest)
1256 # A stale broken file may remain. It is possible for the file to have write
1257 # access bit removed which would cause the file_write() call to fail to open
1258 # in write mode. Take no chance here.
1259 file_path.try_remove(path)
1260 try:
1261 size = file_write(path, content)
1262 except:
1263 # There are two possible places were an exception can occur:
1264 # 1) Inside |content| generator in case of network or unzipping errors.
1265 # 2) Inside file_write itself in case of disk IO errors.
1266 # In any case delete an incomplete file and propagate the exception to
1267 # caller, it will be logged there.
1268 file_path.try_remove(path)
1269 raise
1270 # Make the file read-only in the cache. This has a few side-effects since
1271 # the file node is modified, so every directory entries to this file becomes
1272 # read-only. It's fine here because it is a new file.
1273 file_path.set_read_only(path, True)
1274 with self._lock:
1275 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001276 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001277
nodirf33b8d62016-10-26 22:34:58 -07001278 def get_oldest(self):
1279 """Returns digest of the LRU item or None."""
1280 try:
1281 return self._lru.get_oldest()[0]
1282 except KeyError:
1283 return None
1284
1285 def get_timestamp(self, digest):
1286 """Returns timestamp of last use of an item.
1287
1288 Raises KeyError if item is not found.
1289 """
1290 return self._lru.get_timestamp(digest)
1291
1292 def trim(self):
1293 """Forces retention policies."""
1294 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001295 return self._trim()
nodirf33b8d62016-10-26 22:34:58 -07001296
maruele6fc9382017-05-04 09:03:48 -07001297 def _load(self, trim, time_fn):
maruel2e8d0f52016-07-16 07:51:29 -07001298 """Loads state of the cache from json file.
1299
1300 If cache_dir does not exist on disk, it is created.
1301 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001302 self._lock.assert_locked()
1303
maruel2e8d0f52016-07-16 07:51:29 -07001304 if not fs.isfile(self.state_file):
1305 if not os.path.isdir(self.cache_dir):
1306 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001307 else:
maruel2e8d0f52016-07-16 07:51:29 -07001308 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001309 try:
1310 self._lru = lru.LRUDict.load(self.state_file)
1311 except ValueError as err:
1312 logging.error('Failed to load cache state: %s' % (err,))
1313 # Don't want to keep broken state file.
1314 file_path.try_remove(self.state_file)
maruele6fc9382017-05-04 09:03:48 -07001315 if time_fn:
1316 self._lru.time_fn = time_fn
nodirf33b8d62016-10-26 22:34:58 -07001317 if trim:
1318 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001319 # We want the initial cache size after trimming, i.e. what is readily
1320 # avaiable.
1321 self._initial_number_items = len(self._lru)
1322 self._initial_size = sum(self._lru.itervalues())
1323 if self._evicted:
1324 logging.info(
1325 'Trimming evicted items with the following sizes: %s',
1326 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001327
1328 def _save(self):
1329 """Saves the LRU ordering."""
1330 self._lock.assert_locked()
1331 if sys.platform != 'win32':
1332 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001333 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001334 # Necessary otherwise the file can't be created.
1335 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001336 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001337 file_path.set_read_only(self.state_file, False)
1338 self._lru.save(self.state_file)
1339
1340 def _trim(self):
1341 """Trims anything we don't know, make sure enough free space exists."""
1342 self._lock.assert_locked()
1343
1344 # Ensure maximum cache size.
1345 if self.policies.max_cache_size:
1346 total_size = sum(self._lru.itervalues())
1347 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001348 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001349
1350 # Ensure maximum number of items in the cache.
1351 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1352 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001353 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001354
1355 # Ensure enough free space.
1356 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001357 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001358 while (
1359 self.policies.min_free_space and
1360 self._lru and
1361 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001362 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001363 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001364
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001365 if trimmed_due_to_space:
1366 total_usage = sum(self._lru.itervalues())
1367 usage_percent = 0.
1368 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001369 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1370
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001371 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001372 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1373 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1374 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001375 self._free_disk / 1024.,
1376 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001377 usage_percent,
1378 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001379 self._save()
maruele6fc9382017-05-04 09:03:48 -07001380 return trimmed_due_to_space
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001381
1382 def _path(self, digest):
1383 """Returns the path to one item."""
1384 return os.path.join(self.cache_dir, digest)
1385
maruel2e8d0f52016-07-16 07:51:29 -07001386 def _remove_lru_file(self, allow_protected):
1387 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001388 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001389 try:
nodireabc11c2016-10-18 16:37:28 -07001390 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001391 if not allow_protected and digest == self._protected:
maruele6fc9382017-05-04 09:03:48 -07001392 raise Error(
1393 'Not enough space to fetch the whole isolated tree; %sb free, min '
1394 'is %sb' % (self._free_disk, self.policies.min_free_space))
maruel083fa552016-04-08 14:38:01 -07001395 except KeyError:
1396 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001397 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001398 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001399 self._delete_file(digest, size)
1400 return size
1401
1402 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1403 """Adds an item into LRU cache marking it as a newest one."""
1404 self._lock.assert_locked()
1405 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001406 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001407 self._added.append(size)
1408 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001409 self._free_disk -= size
1410 # Do a quicker version of self._trim(). It only enforces free disk space,
1411 # not cache size limits. It doesn't actually look at real free disk space,
1412 # only uses its cache values. self._trim() will be called later to enforce
1413 # real trimming but doing this quick version here makes it possible to map
1414 # an isolated that is larger than the current amount of free disk space when
1415 # the cache size is already large.
1416 while (
1417 self.policies.min_free_space and
1418 self._lru and
1419 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001420 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001421
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001422 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1423 """Deletes cache file from the file system."""
1424 self._lock.assert_locked()
1425 try:
1426 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001427 try:
1428 size = fs.stat(self._path(digest)).st_size
1429 except OSError:
1430 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001431 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001432 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001433 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001434 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001435 if e.errno != errno.ENOENT:
1436 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001437
1438
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001439class IsolatedBundle(object):
1440 """Fetched and parsed .isolated file with all dependencies."""
1441
Vadim Shtayura3148e072014-09-02 18:51:52 -07001442 def __init__(self):
1443 self.command = []
1444 self.files = {}
1445 self.read_only = None
1446 self.relative_cwd = None
1447 # The main .isolated file, a IsolatedFile instance.
1448 self.root = None
1449
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001450 def fetch(self, fetch_queue, root_isolated_hash, algo):
1451 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001452
1453 It enables support for "included" .isolated files. They are processed in
1454 strict order but fetched asynchronously from the cache. This is important so
1455 that a file in an included .isolated file that is overridden by an embedding
1456 .isolated file is not fetched needlessly. The includes are fetched in one
1457 pass and the files are fetched as soon as all the ones on the left-side
1458 of the tree were fetched.
1459
1460 The prioritization is very important here for nested .isolated files.
1461 'includes' have the highest priority and the algorithm is optimized for both
1462 deep and wide trees. A deep one is a long link of .isolated files referenced
1463 one at a time by one item in 'includes'. A wide one has a large number of
1464 'includes' in a single .isolated file. 'left' is defined as an included
1465 .isolated file earlier in the 'includes' list. So the order of the elements
1466 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001467
1468 As a side effect this method starts asynchronous fetch of all data files
1469 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1470 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001471 """
1472 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1473
1474 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1475 pending = {}
1476 # Set of hashes of already retrieved items to refuse recursive includes.
1477 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001478 # Set of IsolatedFile's whose data files have already being fetched.
1479 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001480
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001481 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001482 h = isolated_file.obj_hash
1483 if h in seen:
1484 raise isolated_format.IsolatedError(
1485 'IsolatedFile %s is retrieved recursively' % h)
1486 assert h not in pending
1487 seen.add(h)
1488 pending[h] = isolated_file
1489 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1490
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001491 # Start fetching root *.isolated file (single file, not the whole bundle).
1492 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001493
1494 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001495 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001496 item_hash = fetch_queue.wait(pending)
1497 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001498 with fetch_queue.cache.getfileobj(item_hash) as f:
1499 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001500
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001501 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001502 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001503 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001504
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001505 # Always fetch *.isolated files in traversal order, waiting if necessary
1506 # until next to-be-processed node loads. "Waiting" is done by yielding
1507 # back to the outer loop, that waits until some *.isolated is loaded.
1508 for node in isolated_format.walk_includes(self.root):
1509 if node not in processed:
1510 # Not visited, and not yet loaded -> wait for it to load.
1511 if not node.is_loaded:
1512 break
1513 # Not visited and loaded -> process it and continue the traversal.
1514 self._start_fetching_files(node, fetch_queue)
1515 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001516
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001517 # All *.isolated files should be processed by now and only them.
1518 all_isolateds = set(isolated_format.walk_includes(self.root))
1519 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001520
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001521 # Extract 'command' and other bundle properties.
1522 for node in isolated_format.walk_includes(self.root):
1523 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001524 self.relative_cwd = self.relative_cwd or ''
1525
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001526 def _start_fetching_files(self, isolated, fetch_queue):
1527 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001528
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001529 Modifies self.files.
1530 """
maruel10bea7b2016-12-07 05:03:49 -08001531 files = isolated.data.get('files', {})
1532 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1533 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001534 # Root isolated has priority on the files being mapped. In particular,
1535 # overridden files must not be fetched.
1536 if filepath not in self.files:
1537 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001538
1539 # Make sure if the isolated is read only, the mode doesn't have write
1540 # bits.
1541 if 'm' in properties and self.read_only:
1542 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1543
1544 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001545 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001546 fetch_queue.add(
1547 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1548
1549 def _update_self(self, node):
1550 """Extracts bundle global parameters from loaded *.isolated file.
1551
1552 Will be called with each loaded *.isolated file in order of traversal of
1553 isolated include graph (see isolated_format.walk_includes).
1554 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001555 # Grabs properties.
1556 if not self.command and node.data.get('command'):
1557 # Ensure paths are correctly separated on windows.
1558 self.command = node.data['command']
1559 if self.command:
1560 self.command[0] = self.command[0].replace('/', os.path.sep)
1561 self.command = tools.fix_python_path(self.command)
1562 if self.read_only is None and node.data.get('read_only') is not None:
1563 self.read_only = node.data['read_only']
1564 if (self.relative_cwd is None and
1565 node.data.get('relative_cwd') is not None):
1566 self.relative_cwd = node.data['relative_cwd']
1567
1568
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001569def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001570 """Returns Storage class that can upload and download from |namespace|.
1571
1572 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001573 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001574 namespace: isolate namespace to operate in, also defines hashing and
1575 compression scheme used, i.e. namespace names that end with '-gzip'
1576 store compressed data.
1577
1578 Returns:
1579 Instance of Storage.
1580 """
aludwin81178302016-11-30 17:18:49 -08001581 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001582
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001583
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001584def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001585 """Uploads the given tree to the given url.
1586
1587 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001588 base_url: The url of the isolate server to upload to.
1589 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001590 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001591 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001592 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001593 # Filter out symlinks, since they are not represented by items on isolate
1594 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001595 items = []
1596 seen = set()
1597 skipped = 0
1598 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001599 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001600 if 'l' not in metadata and filepath not in seen:
1601 seen.add(filepath)
1602 item = FileItem(
1603 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001604 digest=metadata['h'],
1605 size=metadata['s'],
1606 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001607 items.append(item)
1608 else:
1609 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001610
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001611 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001612 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001613 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001614
1615
maruel4409e302016-07-19 14:25:51 -07001616def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001617 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001618
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001619 Arguments:
1620 isolated_hash: hash of the root *.isolated file.
1621 storage: Storage class that communicates with isolate storage.
1622 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001623 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001624 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001625
1626 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001627 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001628 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001629 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001630 'fetch_isolated(%s, %s, %s, %s, %s)',
1631 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001632 # Hash algorithm to use, defined by namespace |storage| is using.
1633 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001634 with cache:
1635 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001636 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001637
1638 with tools.Profiler('GetIsolateds'):
1639 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001640 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001641 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001642 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001643 try:
maruel1ceb3872015-10-14 06:10:44 -07001644 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001645 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001646 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001647 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1648 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001649
1650 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001651 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001652
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001653 with tools.Profiler('GetRest'):
1654 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001655 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001656 create_directories(outdir, bundle.files)
1657 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001658
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001659 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001660 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001661 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001662
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001663 # Multimap: digest -> list of pairs (path, props).
1664 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001665 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001666 if 'h' in props:
1667 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001668
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001669 # Now block on the remaining files to be downloaded and mapped.
1670 logging.info('Retrieving remaining files (%d of them)...',
1671 fetch_queue.pending_count)
1672 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001673 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001674 while remaining:
1675 detector.ping()
1676
1677 # Wait for any item to finish fetching to cache.
1678 digest = fetch_queue.wait(remaining)
1679
tansell9e04a8d2016-07-28 09:31:59 -07001680 # Create the files in the destination using item in cache as the
1681 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001682 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001683 fullpath = os.path.join(outdir, filepath)
1684
1685 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001686 filetype = props.get('t', 'basic')
1687
1688 if filetype == 'basic':
1689 file_mode = props.get('m')
1690 if file_mode:
1691 # Ignore all bits apart from the user
1692 file_mode &= 0700
1693 putfile(
1694 srcfileobj, fullpath, file_mode,
1695 use_symlink=use_symlinks)
1696
tansell26de79e2016-11-13 18:41:11 -08001697 elif filetype == 'tar':
1698 basedir = os.path.dirname(fullpath)
1699 with tarfile.TarFile(fileobj=srcfileobj) as extractor:
1700 for ti in extractor:
1701 if not ti.isfile():
1702 logging.warning(
1703 'Path(%r) is nonfile (%s), skipped',
1704 ti.name, ti.type)
1705 continue
1706 fp = os.path.normpath(os.path.join(basedir, ti.name))
1707 if not fp.startswith(basedir):
1708 logging.error(
1709 'Path(%r) is outside root directory',
1710 fp)
1711 ifd = extractor.extractfile(ti)
1712 file_path.ensure_tree(os.path.dirname(fp))
1713 putfile(ifd, fp, 0700, ti.size)
1714
tanselle4288c32016-07-28 09:45:40 -07001715 elif filetype == 'ar':
1716 basedir = os.path.dirname(fullpath)
1717 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
1718 for ai, ifd in extractor:
1719 fp = os.path.normpath(os.path.join(basedir, ai.name))
tansell26de79e2016-11-13 18:41:11 -08001720 if not fp.startswith(basedir):
1721 logging.error(
1722 'Path(%r) is outside root directory',
1723 fp)
tanselle4288c32016-07-28 09:45:40 -07001724 file_path.ensure_tree(os.path.dirname(fp))
1725 putfile(ifd, fp, 0700, ai.size)
1726
1727 else:
1728 raise isolated_format.IsolatedError(
1729 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001730
1731 # Report progress.
1732 duration = time.time() - last_update
1733 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1734 msg = '%d files remaining...' % len(remaining)
1735 print msg
1736 logging.info(msg)
1737 last_update = time.time()
1738
1739 # Cache could evict some items we just tried to fetch, it's a fatal error.
1740 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001741 raise isolated_format.MappingError(
1742 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001743 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001744
1745
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001746def directory_to_metadata(root, algo, blacklist):
1747 """Returns the FileItem list and .isolated metadata for a directory."""
1748 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001749 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001750 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001751 metadata = {
1752 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001753 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001754 for relpath in paths
1755 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001756 for v in metadata.itervalues():
1757 v.pop('t')
1758 items = [
1759 FileItem(
1760 path=os.path.join(root, relpath),
1761 digest=meta['h'],
1762 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001763 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001764 for relpath, meta in metadata.iteritems() if 'h' in meta
1765 ]
1766 return items, metadata
1767
1768
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001769def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001770 """Stores every entries and returns the relevant data.
1771
1772 Arguments:
1773 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001774 files: list of file paths to upload. If a directory is specified, a
1775 .isolated file is created and its hash is returned.
1776 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001777
1778 Returns:
1779 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1780 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001781 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001782 assert all(isinstance(i, unicode) for i in files), files
1783 if len(files) != len(set(map(os.path.abspath, files))):
1784 raise Error('Duplicate entries found.')
1785
maruel064c0a32016-04-05 11:47:15 -07001786 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001787 results = []
1788 # The temporary directory is only created as needed.
1789 tempdir = None
1790 try:
1791 # TODO(maruel): Yield the files to a worker thread.
1792 items_to_upload = []
1793 for f in files:
1794 try:
1795 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001796 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001797 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001798 items, metadata = directory_to_metadata(
1799 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001800
1801 # Create the .isolated file.
1802 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001803 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1804 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001805 os.close(handle)
1806 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001807 'algo':
1808 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001809 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001810 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001811 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001812 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001813 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001814 items_to_upload.extend(items)
1815 items_to_upload.append(
1816 FileItem(
1817 path=isolated,
1818 digest=h,
maruel12e30012015-10-09 11:55:35 -07001819 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001820 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001821 results.append((h, f))
1822
maruel12e30012015-10-09 11:55:35 -07001823 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001824 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001825 items_to_upload.append(
1826 FileItem(
1827 path=filepath,
1828 digest=h,
maruel12e30012015-10-09 11:55:35 -07001829 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001830 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001831 results.append((h, f))
1832 else:
1833 raise Error('%s is neither a file or directory.' % f)
1834 except OSError:
1835 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001836 uploaded = storage.upload_items(items_to_upload)
1837 cold = [i for i in items_to_upload if i in uploaded]
1838 hot = [i for i in items_to_upload if i not in uploaded]
1839 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001840 finally:
maruel12e30012015-10-09 11:55:35 -07001841 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001842 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001843
1844
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001845def archive(out, namespace, files, blacklist):
1846 if files == ['-']:
1847 files = sys.stdin.readlines()
1848
1849 if not files:
1850 raise Error('Nothing to upload')
1851
1852 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001853 blacklist = tools.gen_blacklist(blacklist)
1854 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001855 # Ignore stats.
1856 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001857 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1858
1859
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001860@subcommand.usage('<file1..fileN> or - to read from stdin')
1861def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001862 """Archives data to the server.
1863
1864 If a directory is specified, a .isolated file is created the whole directory
1865 is uploaded. Then this .isolated file can be included in another one to run
1866 commands.
1867
1868 The commands output each file that was processed with its content hash. For
1869 directories, the .isolated generated for the directory is listed as the
1870 directory entry itself.
1871 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001872 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001873 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001874 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001875 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001876 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001877 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001878 except Error as e:
1879 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001880 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001881
1882
1883def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001884 """Download data from the server.
1885
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001886 It can either download individual files or a complete tree from a .isolated
1887 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001888 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001889 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001890 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001891 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001892 help='hash of an isolated file, .isolated file content is discarded, use '
1893 '--file if you need it')
1894 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001895 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1896 help='hash and destination of a file, can be used multiple times')
1897 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001898 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001899 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001900 parser.add_option(
1901 '--use-symlinks', action='store_true',
1902 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001903 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001904 options, args = parser.parse_args(args)
1905 if args:
1906 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001907 if not file_path.enable_symlink():
1908 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001909
nodir55be77b2016-05-03 09:39:57 -07001910 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001911 if bool(options.isolated) == bool(options.file):
1912 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001913 if not options.cache and options.use_symlinks:
1914 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001915
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001916 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001917 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001918 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001919 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001920 if (fs.isfile(options.target) or
1921 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001922 parser.error(
1923 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001924 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001925 # Fetching individual files.
1926 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001927 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001928 channel = threading_utils.TaskChannel()
1929 pending = {}
1930 for digest, dest in options.file:
1931 pending[digest] = dest
1932 storage.async_fetch(
1933 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001934 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001935 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001936 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001937 functools.partial(file_write, os.path.join(options.target, dest)))
1938 while pending:
1939 fetched = channel.pull()
1940 dest = pending.pop(fetched)
1941 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001942
Vadim Shtayura3172be52013-12-03 12:49:05 -08001943 # Fetching whole isolated tree.
1944 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001945 with cache:
1946 bundle = fetch_isolated(
1947 isolated_hash=options.isolated,
1948 storage=storage,
1949 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001950 outdir=options.target,
1951 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001952 if bundle.command:
1953 rel = os.path.join(options.target, bundle.relative_cwd)
1954 print('To run this test please run from the directory %s:' %
1955 os.path.join(options.target, rel))
1956 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001957
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001958 return 0
1959
1960
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001961def add_archive_options(parser):
1962 parser.add_option(
1963 '--blacklist',
1964 action='append', default=list(DEFAULT_BLACKLIST),
1965 help='List of regexp to use as blacklist filter when uploading '
1966 'directories')
1967
1968
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001969def add_isolate_server_options(parser):
1970 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001971 parser.add_option(
1972 '-I', '--isolate-server',
1973 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001974 help='URL of the Isolate Server to use. Defaults to the environment '
1975 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1976 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001977 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001978 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001979 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001980 '--namespace', default='default-gzip',
1981 help='The namespace to use on the Isolate Server, default: %default')
1982
1983
nodir55be77b2016-05-03 09:39:57 -07001984def process_isolate_server_options(
1985 parser, options, set_exception_handler, required):
1986 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001987
1988 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001989 """
1990 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001991 if required:
1992 parser.error('--isolate-server is required.')
1993 return
1994
aludwind7b7b7e2017-06-29 16:38:50 -07001995 if options.grpc_proxy:
1996 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001997 else:
1998 try:
1999 options.isolate_server = net.fix_url(options.isolate_server)
2000 except ValueError as e:
2001 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002002 if set_exception_handler:
2003 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002004 try:
2005 return auth.ensure_logged_in(options.isolate_server)
2006 except ValueError as e:
2007 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002008
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002009
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002010def add_cache_options(parser):
2011 cache_group = optparse.OptionGroup(parser, 'Cache management')
2012 cache_group.add_option(
2013 '--cache', metavar='DIR',
2014 help='Directory to keep a local cache of the files. Accelerates download '
2015 'by reusing already downloaded files. Default=%default')
2016 cache_group.add_option(
2017 '--max-cache-size',
2018 type='int',
2019 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002020 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002021 help='Trim if the cache gets larger than this value, default=%default')
2022 cache_group.add_option(
2023 '--min-free-space',
2024 type='int',
2025 metavar='NNN',
2026 default=2*1024*1024*1024,
2027 help='Trim if disk free space becomes lower than this value, '
2028 'default=%default')
2029 cache_group.add_option(
2030 '--max-items',
2031 type='int',
2032 metavar='NNN',
2033 default=100000,
2034 help='Trim if more than this number of items are in the cache '
2035 'default=%default')
2036 parser.add_option_group(cache_group)
2037
2038
maruele6fc9382017-05-04 09:03:48 -07002039def process_cache_options(options, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002040 if options.cache:
2041 policies = CachePolicies(
2042 options.max_cache_size, options.min_free_space, options.max_items)
2043
2044 # |options.cache| path may not exist until DiskCache() instance is created.
2045 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002046 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002047 policies,
nodirf33b8d62016-10-26 22:34:58 -07002048 isolated_format.get_hash_algo(options.namespace),
maruele6fc9382017-05-04 09:03:48 -07002049 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002050 else:
2051 return MemoryCache()
2052
2053
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002054class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002055 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002056 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002057 self,
2058 version=__version__,
2059 prog=os.path.basename(sys.modules[__name__].__file__),
2060 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002061 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002062
2063 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002064 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002065 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002066 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002067 return options, args
2068
2069
2070def main(args):
2071 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002072 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002073
2074
2075if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002076 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002077 fix_encoding.fix_encoding()
2078 tools.disable_buffering()
2079 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002080 sys.exit(main(sys.argv[1:]))