blob: 80302a3e9ce4a0ad5668aaf2a3dd16e48856deab [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
aludwin81178302016-11-30 17:18:49 -08008__version__ = '0.8.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
tanselle4288c32016-07-28 09:45:40 -070029from libs import arfile
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070036from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000037from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000038from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080040import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040041import isolated_format
aludwin81178302016-11-30 17:18:49 -080042import isolate_storage
43from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# The file size to be used when we don't know the correct file size,
51# generally used for .isolated files.
52UNKNOWN_FILE_SIZE = None
53
54
55# Maximum expected delay (in seconds) between successive file fetches or uploads
56# in Storage. If it takes longer than that, a deadlock might be happening
57# and all stack frames for all threads are dumped to log.
58DEADLOCK_TIMEOUT = 5 * 60
59
60
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000061# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000062# All files are sorted by likelihood of a change in the file content
63# (currently file size is used to estimate this: larger the file -> larger the
64# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000065# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000066# and so on. Numbers here is a trade-off; the more per request, the lower the
67# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
68# larger values cause longer lookups, increasing the initial latency to start
69# uploading, which is especially an issue for large files. This value is
70# optimized for the "few thousands files to look up with minimal number of large
71# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040072ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000073
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000074
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075# A list of already compressed extension types that should not receive any
76# compression before being uploaded.
77ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040078 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
79 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000080]
81
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000082
maruel@chromium.org41601642013-09-18 19:40:46 +000083# The delay (in seconds) to wait between logging statements when retrieving
84# the required files. This is intended to let the user (or buildbot) know that
85# the program is still running.
86DELAY_BETWEEN_UPDATES_IN_SECS = 30
87
88
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050089DEFAULT_BLACKLIST = (
90 # Temporary vim or python files.
91 r'^.+\.(?:pyc|swp)$',
92 # .git or .svn directory.
93 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
94)
95
96
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050097class Error(Exception):
98 """Generic runtime error."""
99 pass
100
101
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400102class Aborted(Error):
103 """Operation aborted."""
104 pass
105
106
nodir90bc8dc2016-06-15 13:35:21 -0700107class AlreadyExists(Error):
108 """File already exists."""
109
110
maruel12e30012015-10-09 11:55:35 -0700111def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800112 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700113 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800114 if offset:
115 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000116 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000117 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000118 if not data:
119 break
120 yield data
121
122
maruel12e30012015-10-09 11:55:35 -0700123def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000124 """Writes file content as generated by content_generator.
125
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000126 Creates the intermediary directory as needed.
127
128 Returns the number of bytes written.
129
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000130 Meant to be mocked out in unit tests.
131 """
nodire5028a92016-04-29 14:38:21 -0700132 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 total = 0
maruel12e30012015-10-09 11:55:35 -0700134 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000135 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000136 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000138 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000139
140
tansell9e04a8d2016-07-28 09:31:59 -0700141def fileobj_path(fileobj):
142 """Return file system path for file like object or None.
143
144 The returned path is guaranteed to exist and can be passed to file system
145 operations like copy.
146 """
147 name = getattr(fileobj, 'name', None)
148 if name is None:
149 return
150
151 # If the file like object was created using something like open("test.txt")
152 # name will end up being a str (such as a function outside our control, like
153 # the standard library). We want all our paths to be unicode objects, so we
154 # decode it.
155 if not isinstance(name, unicode):
156 name = name.decode(sys.getfilesystemencoding())
157
tansell26de79e2016-11-13 18:41:11 -0800158 # fs.exists requires an absolute path, otherwise it will fail with an
159 # assertion error.
160 if not os.path.isabs(name):
161 return
162
tansell9e04a8d2016-07-28 09:31:59 -0700163 if fs.exists(name):
164 return name
165
166
167# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
168# wrappers have been created.
169def fileobj_copy(
170 dstfileobj, srcfileobj, size=-1,
171 chunk_size=isolated_format.DISK_FILE_CHUNK):
172 """Copy data from srcfileobj to dstfileobj.
173
174 Providing size means exactly that amount of data will be copied (if there
175 isn't enough data, an IOError exception is thrown). Otherwise all data until
176 the EOF marker will be copied.
177 """
178 if size == -1 and hasattr(srcfileobj, 'tell'):
179 if srcfileobj.tell() != 0:
180 raise IOError('partial file but not using size')
181
182 written = 0
183 while written != size:
184 readsize = chunk_size
185 if size > 0:
186 readsize = min(readsize, size-written)
187 data = srcfileobj.read(readsize)
188 if not data:
189 if size == -1:
190 break
191 raise IOError('partial file, got %s, wanted %s' % (written, size))
192 dstfileobj.write(data)
193 written += len(data)
194
195
196def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
197 """Put srcfileobj at the given dstpath with given mode.
198
199 The function aims to do this as efficiently as possible while still allowing
200 any possible file like object be given.
201
202 Creating a tree of hardlinks has a few drawbacks:
203 - tmpfs cannot be used for the scratch space. The tree has to be on the same
204 partition as the cache.
205 - involves a write to the inode, which advances ctime, cause a metadata
206 writeback (causing disk seeking).
207 - cache ctime cannot be used to detect modifications / corruption.
208 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
209 partition. This is why the function automatically fallbacks to copying the
210 file content.
211 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
212 same owner is for all hardlinks.
213 - Anecdotal report that ext2 is known to be potentially faulty on high rate
214 of hardlink creation.
215
216 Creating a tree of symlinks has a few drawbacks:
217 - Tasks running the equivalent of os.path.realpath() will get the naked path
218 and may fail.
219 - Windows:
220 - Symlinks are reparse points:
221 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
222 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
223 - Symbolic links are Win32 paths, not NT paths.
224 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
225 - Symbolic links are supported on Windows 7 and later only.
226 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
227 default.
228 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
229 RID is present in the token;
230 https://msdn.microsoft.com/en-us/library/bb530410.aspx
231 """
232 srcpath = fileobj_path(srcfileobj)
233 if srcpath and size == -1:
234 readonly = file_mode is None or (
235 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
236
237 if readonly:
238 # If the file is read only we can link the file
239 if use_symlink:
240 link_mode = file_path.SYMLINK_WITH_FALLBACK
241 else:
242 link_mode = file_path.HARDLINK_WITH_FALLBACK
243 else:
244 # If not read only, we must copy the file
245 link_mode = file_path.COPY
246
247 file_path.link_file(dstpath, srcpath, link_mode)
248 else:
249 # Need to write out the file
250 with fs.open(dstpath, 'wb') as dstfileobj:
251 fileobj_copy(dstfileobj, srcfileobj, size)
252
253 assert fs.exists(dstpath)
254
255 # file_mode of 0 is actually valid, so need explicit check.
256 if file_mode is not None:
257 fs.chmod(dstpath, file_mode)
258
259
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000260def zip_compress(content_generator, level=7):
261 """Reads chunks from |content_generator| and yields zip compressed chunks."""
262 compressor = zlib.compressobj(level)
263 for chunk in content_generator:
264 compressed = compressor.compress(chunk)
265 if compressed:
266 yield compressed
267 tail = compressor.flush(zlib.Z_FINISH)
268 if tail:
269 yield tail
270
271
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400272def zip_decompress(
273 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000274 """Reads zipped data from |content_generator| and yields decompressed data.
275
276 Decompresses data in small chunks (no larger than |chunk_size|) so that
277 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
278
279 Raises IOError if data is corrupted or incomplete.
280 """
281 decompressor = zlib.decompressobj()
282 compressed_size = 0
283 try:
284 for chunk in content_generator:
285 compressed_size += len(chunk)
286 data = decompressor.decompress(chunk, chunk_size)
287 if data:
288 yield data
289 while decompressor.unconsumed_tail:
290 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
291 if data:
292 yield data
293 tail = decompressor.flush()
294 if tail:
295 yield tail
296 except zlib.error as e:
297 raise IOError(
298 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
299 # Ensure all data was read and decompressed.
300 if decompressor.unused_data or decompressor.unconsumed_tail:
301 raise IOError('Not all data was decompressed')
302
303
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000304def get_zip_compression_level(filename):
305 """Given a filename calculates the ideal zip compression level to use."""
306 file_ext = os.path.splitext(filename)[1].lower()
307 # TODO(csharp): Profile to find what compression level works best.
308 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
309
310
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000311def create_directories(base_directory, files):
312 """Creates the directory structure needed by the given list of files."""
313 logging.debug('create_directories(%s, %d)', base_directory, len(files))
314 # Creates the tree of directories to create.
315 directories = set(os.path.dirname(f) for f in files)
316 for item in list(directories):
317 while item:
318 directories.add(item)
319 item = os.path.dirname(item)
320 for d in sorted(directories):
321 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700322 abs_d = os.path.join(base_directory, d)
323 if not fs.isdir(abs_d):
324 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000325
326
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500327def create_symlinks(base_directory, files):
328 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000329 for filepath, properties in files:
330 if 'l' not in properties:
331 continue
332 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500333 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000334 logging.warning('Ignoring symlink %s', filepath)
335 continue
336 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700337 try:
338 os.symlink(properties['l'], outfile) # pylint: disable=E1101
339 except OSError as e:
340 if e.errno == errno.EEXIST:
341 raise AlreadyExists('File %s already exists.' % outfile)
342 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000343
344
maruel12e30012015-10-09 11:55:35 -0700345def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000346 """Determines if the given files appears valid.
347
vadimsh129e5942017-01-04 16:42:46 -0800348 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000349 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700350 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700351 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800352 try:
353 actual_size = fs.stat(path).st_size
354 except OSError as e:
355 logging.warning(
356 'Can\'t read item %s, assuming it\'s invalid: %s',
357 os.path.basename(path), e)
358 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000359 if size != actual_size:
360 logging.warning(
361 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700362 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000363 return False
364 return True
365
366
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000367class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800368 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000369
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 Its digest and size may be provided in advance, if known. Otherwise they will
371 be derived from the file content.
372 """
373
374 def __init__(self, path, digest=None, size=None, high_priority=False):
375 super(FileItem, self).__init__(
376 digest,
maruel12e30012015-10-09 11:55:35 -0700377 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800378 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000379 self.path = path
380 self.compression_level = get_zip_compression_level(path)
381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000386class BufferItem(Item):
387 """A byte buffer to push to Storage."""
388
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800389 def __init__(self, buf, high_priority=False):
390 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000391 self.buffer = buf
392
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800393 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000394 return [self.buffer]
395
396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800398 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000399
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800400 Implements compression support, parallel 'contains' checks, parallel uploads
401 and more.
402
403 Works only within single namespace (and thus hashing algorithm and compression
404 scheme are fixed).
405
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400406 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
407 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800408 """
409
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700410 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000411 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400412 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700413 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400414 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 self._cpu_thread_pool = None
416 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 self._aborted = False
418 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000419
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700421 def hash_algo(self):
422 """Hashing algorithm used to name files in storage based on their content.
423
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400424 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700425 """
426 return self._hash_algo
427
428 @property
429 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500430 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700431 return self._storage_api.location
432
433 @property
434 def namespace(self):
435 """Isolate namespace used by this storage.
436
437 Indirectly defines hashing scheme and compression method used.
438 """
439 return self._storage_api.namespace
440
441 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000442 def cpu_thread_pool(self):
443 """ThreadPool for CPU-bound tasks like zipping."""
444 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500445 threads = max(threading_utils.num_processors(), 2)
446 if sys.maxsize <= 2L**32:
447 # On 32 bits userland, do not try to use more than 16 threads.
448 threads = min(threads, 16)
449 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000450 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 @property
453 def net_thread_pool(self):
454 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
455 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700456 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 def close(self):
460 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400461 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 if self._cpu_thread_pool:
463 self._cpu_thread_pool.join()
464 self._cpu_thread_pool.close()
465 self._cpu_thread_pool = None
466 if self._net_thread_pool:
467 self._net_thread_pool.join()
468 self._net_thread_pool.close()
469 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400470 logging.info('Done.')
471
472 def abort(self):
473 """Cancels any pending or future operations."""
474 # This is not strictly theadsafe, but in the worst case the logging message
475 # will be printed twice. Not a big deal. In other places it is assumed that
476 # unprotected reads and writes to _aborted are serializable (it is true
477 # for python) and thus no locking is used.
478 if not self._aborted:
479 logging.warning('Aborting... It can take a while.')
480 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000481
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 def __enter__(self):
483 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400484 assert not self._prev_sig_handlers, self._prev_sig_handlers
485 for s in (signal.SIGINT, signal.SIGTERM):
486 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 return self
488
489 def __exit__(self, _exc_type, _exc_value, _traceback):
490 """Context manager interface."""
491 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400492 while self._prev_sig_handlers:
493 s, h = self._prev_sig_handlers.popitem()
494 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 return False
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
503 items: list of Item instances that represents data to upload.
504
505 Returns:
506 List of items that were uploaded. All other items are already there.
507 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700508 logging.info('upload_items(items=%d)', len(items))
509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 # Ensure all digests are calculated.
511 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700512 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000514 # For each digest keep only first Item that matches it. All other items
515 # are just indistinguishable copies from the point of view of isolate
516 # server (it doesn't care about paths at all, only content and digests).
517 seen = {}
518 duplicates = 0
519 for item in items:
520 if seen.setdefault(item.digest, item) is not item:
521 duplicates += 1
522 items = seen.values()
523 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700524 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000525
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 channel = threading_utils.TaskChannel()
530 for missing_item, push_state in self.get_missing_items(items):
531 missing.add(missing_item)
532 self.async_push(channel, missing_item, push_state)
533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 # No need to spawn deadlock detector thread if there's nothing to upload.
535 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700536 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 detector.ping()
540 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 logging.info('All files are uploaded')
545
546 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 total = len(items)
548 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 logging.info(
550 'Total: %6d, %9.1fkb',
551 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000552 total_size / 1024.)
553 cache_hit = set(items) - missing
554 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 logging.info(
556 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
557 len(cache_hit),
558 cache_hit_size / 1024.,
559 len(cache_hit) * 100. / total,
560 cache_hit_size * 100. / total_size if total_size else 0)
561 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 logging.info(
564 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
565 len(cache_miss),
566 cache_miss_size / 1024.,
567 len(cache_miss) * 100. / total,
568 cache_miss_size * 100. / total_size if total_size else 0)
569
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 return uploaded
571
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000573 """Starts asynchronous push to the server in a parallel thread.
574
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 Can be used only after |item| was checked for presence on a server with
576 'get_missing_items' call. 'get_missing_items' returns |push_state| object
577 that contains storage specific information describing how to upload
578 the item (for example in case of cloud storage, it is signed upload URLs).
579
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000581 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000582 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800583 push_state: push state returned by 'get_missing_items' call for |item|.
584
585 Returns:
586 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800588 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400589 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700590 threading_utils.PRIORITY_HIGH if item.high_priority
591 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800592
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000593 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400594 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400595 if self._aborted:
596 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800598 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 return item
600
601 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700602 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800603 self.net_thread_pool.add_task_with_channel(
604 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 return
606
607 # If zipping is enabled, zip in a separate thread.
608 def zip_and_push():
609 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
610 # content right here. It will block until all file is zipped.
611 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400612 if self._aborted:
613 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 data = ''.join(stream)
616 except Exception as exc:
617 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800618 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000620 self.net_thread_pool.add_task_with_channel(
621 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000622 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624 def push(self, item, push_state):
625 """Synchronously pushes a single item to the server.
626
627 If you need to push many items at once, consider using 'upload_items' or
628 'async_push' with instance of TaskChannel.
629
630 Arguments:
631 item: item to upload as instance of Item class.
632 push_state: push state returned by 'get_missing_items' call for |item|.
633
634 Returns:
635 Pushed item (same object as |item|).
636 """
637 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700638 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639 self.async_push(channel, item, push_state)
640 pushed = channel.pull()
641 assert pushed is item
642 return item
643
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000644 def async_fetch(self, channel, priority, digest, size, sink):
645 """Starts asynchronous fetch from the server in a parallel thread.
646
647 Arguments:
648 channel: TaskChannel that receives back |digest| when download ends.
649 priority: thread pool task priority for the fetch.
650 digest: hex digest of an item to download.
651 size: expected size of the item (after decompression).
652 sink: function that will be called as sink(generator).
653 """
654 def fetch():
655 try:
656 # Prepare reading pipeline.
657 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700658 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400659 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000660 # Run |stream| through verifier that will assert its size.
661 verifier = FetchStreamVerifier(stream, size)
662 # Verified stream goes to |sink|.
663 sink(verifier.run())
664 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800665 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000666 raise
667 return digest
668
669 # Don't bother with zip_thread_pool for decompression. Decompression is
670 # really fast and most probably IO bound anyway.
671 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
672
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000673 def get_missing_items(self, items):
674 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000675
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000676 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000677
678 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000679 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800682 For each missing item it yields a pair (item, push_state), where:
683 * item - Item object that is missing (one of |items|).
684 * push_state - opaque object that contains storage specific information
685 describing how to upload the item (for example in case of cloud
686 storage, it is signed upload URLs). It can later be passed to
687 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000688 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000689 channel = threading_utils.TaskChannel()
690 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800691
692 # Ensure all digests are calculated.
693 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700694 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800695
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400696 def contains(batch):
697 if self._aborted:
698 raise Aborted()
699 return self._storage_api.contains(batch)
700
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000701 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800702 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400703 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400704 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000705 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800706
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000707 # Yield results as they come in.
708 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800709 for missing_item, push_state in channel.pull().iteritems():
710 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000711
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000712
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800713def batch_items_for_check(items):
714 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000715
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800716 Each batch corresponds to a single 'exists?' query to the server via a call
717 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000718
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800719 Arguments:
720 items: a list of Item objects.
721
722 Yields:
723 Batches of items to query for existence in a single operation,
724 each batch is a list of Item objects.
725 """
726 batch_count = 0
727 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
728 next_queries = []
729 for item in sorted(items, key=lambda x: x.size, reverse=True):
730 next_queries.append(item)
731 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000732 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800733 next_queries = []
734 batch_count += 1
735 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
736 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
737 if next_queries:
738 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000739
740
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000741class FetchQueue(object):
742 """Fetches items from Storage and places them into LocalCache.
743
744 It manages multiple concurrent fetch operations. Acts as a bridge between
745 Storage and LocalCache so that Storage and LocalCache don't depend on each
746 other at all.
747 """
748
749 def __init__(self, storage, cache):
750 self.storage = storage
751 self.cache = cache
752 self._channel = threading_utils.TaskChannel()
753 self._pending = set()
754 self._accessed = set()
755 self._fetched = cache.cached_set()
756
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400757 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700758 self,
759 digest,
760 size=UNKNOWN_FILE_SIZE,
761 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000762 """Starts asynchronous fetch of item |digest|."""
763 # Fetching it now?
764 if digest in self._pending:
765 return
766
767 # Mark this file as in use, verify_all_cached will later ensure it is still
768 # in cache.
769 self._accessed.add(digest)
770
771 # Already fetched? Notify cache to update item's LRU position.
772 if digest in self._fetched:
773 # 'touch' returns True if item is in cache and not corrupted.
774 if self.cache.touch(digest, size):
775 return
776 # Item is corrupted, remove it from cache and fetch it again.
777 self._fetched.remove(digest)
778 self.cache.evict(digest)
779
780 # TODO(maruel): It should look at the free disk space, the current cache
781 # size and the size of the new item on every new item:
782 # - Trim the cache as more entries are listed when free disk space is low,
783 # otherwise if the amount of data downloaded during the run > free disk
784 # space, it'll crash.
785 # - Make sure there's enough free disk space to fit all dependencies of
786 # this run! If not, abort early.
787
788 # Start fetching.
789 self._pending.add(digest)
790 self.storage.async_fetch(
791 self._channel, priority, digest, size,
792 functools.partial(self.cache.write, digest))
793
794 def wait(self, digests):
795 """Starts a loop that waits for at least one of |digests| to be retrieved.
796
797 Returns the first digest retrieved.
798 """
799 # Flush any already fetched items.
800 for digest in digests:
801 if digest in self._fetched:
802 return digest
803
804 # Ensure all requested items are being fetched now.
805 assert all(digest in self._pending for digest in digests), (
806 digests, self._pending)
807
808 # Wait for some requested item to finish fetching.
809 while self._pending:
810 digest = self._channel.pull()
811 self._pending.remove(digest)
812 self._fetched.add(digest)
813 if digest in digests:
814 return digest
815
816 # Should never reach this point due to assert above.
817 raise RuntimeError('Impossible state')
818
819 def inject_local_file(self, path, algo):
820 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700821 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 data = f.read()
823 digest = algo(data).hexdigest()
824 self.cache.write(digest, [data])
825 self._fetched.add(digest)
826 return digest
827
828 @property
829 def pending_count(self):
830 """Returns number of items to be fetched."""
831 return len(self._pending)
832
833 def verify_all_cached(self):
834 """True if all accessed items are in cache."""
835 return self._accessed.issubset(self.cache.cached_set())
836
837
838class FetchStreamVerifier(object):
839 """Verifies that fetched file is valid before passing it to the LocalCache."""
840
841 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400842 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000843 self.stream = stream
844 self.expected_size = expected_size
845 self.current_size = 0
846
847 def run(self):
848 """Generator that yields same items as |stream|.
849
850 Verifies |stream| is complete before yielding a last chunk to consumer.
851
852 Also wraps IOError produced by consumer into MappingError exceptions since
853 otherwise Storage will retry fetch on unrelated local cache errors.
854 """
855 # Read one chunk ahead, keep it in |stored|.
856 # That way a complete stream can be verified before pushing last chunk
857 # to consumer.
858 stored = None
859 for chunk in self.stream:
860 assert chunk is not None
861 if stored is not None:
862 self._inspect_chunk(stored, is_last=False)
863 try:
864 yield stored
865 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400866 raise isolated_format.MappingError(
867 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000868 stored = chunk
869 if stored is not None:
870 self._inspect_chunk(stored, is_last=True)
871 try:
872 yield stored
873 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400874 raise isolated_format.MappingError(
875 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000876
877 def _inspect_chunk(self, chunk, is_last):
878 """Called for each fetched chunk before passing it to consumer."""
879 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400880 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700881 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000882 (self.expected_size != self.current_size)):
883 raise IOError('Incorrect file size: expected %d, got %d' % (
884 self.expected_size, self.current_size))
885
886
nodir445097b2016-06-03 22:50:26 -0700887class CacheMiss(Exception):
888 """Raised when an item is not in cache."""
889
890 def __init__(self, digest):
891 self.digest = digest
892 super(CacheMiss, self).__init__(
893 'Item with digest %r is not found in cache' % digest)
894
895
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000896class LocalCache(object):
897 """Local cache that stores objects fetched via Storage.
898
899 It can be accessed concurrently from multiple threads, so it should protect
900 its internal state with some lock.
901 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500902 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000903
maruel064c0a32016-04-05 11:47:15 -0700904 def __init__(self):
905 self._lock = threading_utils.LockWithAssert()
906 # Profiling values.
907 self._added = []
908 self._initial_number_items = 0
909 self._initial_size = 0
910 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700911 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700912
nodirbe642ff2016-06-09 15:51:51 -0700913 def __contains__(self, digest):
914 raise NotImplementedError()
915
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000916 def __enter__(self):
917 """Context manager interface."""
918 return self
919
920 def __exit__(self, _exc_type, _exec_value, _traceback):
921 """Context manager interface."""
922 return False
923
maruel064c0a32016-04-05 11:47:15 -0700924 @property
925 def added(self):
926 return self._added[:]
927
928 @property
929 def evicted(self):
930 return self._evicted[:]
931
932 @property
tansell9e04a8d2016-07-28 09:31:59 -0700933 def used(self):
934 return self._used[:]
935
936 @property
maruel064c0a32016-04-05 11:47:15 -0700937 def initial_number_items(self):
938 return self._initial_number_items
939
940 @property
941 def initial_size(self):
942 return self._initial_size
943
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000944 def cached_set(self):
945 """Returns a set of all cached digests (always a new object)."""
946 raise NotImplementedError()
947
maruel36a963d2016-04-08 17:15:49 -0700948 def cleanup(self):
949 """Deletes any corrupted item from the cache and trims it if necessary."""
950 raise NotImplementedError()
951
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000952 def touch(self, digest, size):
953 """Ensures item is not corrupted and updates its LRU position.
954
955 Arguments:
956 digest: hash digest of item to check.
957 size: expected size of this item.
958
959 Returns:
960 True if item is in cache and not corrupted.
961 """
962 raise NotImplementedError()
963
964 def evict(self, digest):
965 """Removes item from cache if it's there."""
966 raise NotImplementedError()
967
tansell9e04a8d2016-07-28 09:31:59 -0700968 def getfileobj(self, digest):
969 """Returns a readable file like object.
970
971 If file exists on the file system it will have a .name attribute with an
972 absolute path to the file.
973 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000974 raise NotImplementedError()
975
976 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -0700977 """Reads data from |content| generator and stores it in cache.
978
979 Returns digest to simplify chaining.
980 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000981 raise NotImplementedError()
982
maruele6fc9382017-05-04 09:03:48 -0700983 def trim(self):
984 """Enforces cache policies.
985
986 Returns:
987 Number of items evicted.
988 """
989 raise NotImplementedError()
990
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000991
992class MemoryCache(LocalCache):
993 """LocalCache implementation that stores everything in memory."""
994
Vadim Shtayurae3fbd102014-04-29 17:05:21 -0700995 def __init__(self, file_mode_mask=0500):
996 """Args:
997 file_mode_mask: bit mask to AND file mode with. Default value will make
998 all mapped files to be read only.
999 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001000 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001001 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001002 self._contents = {}
1003
nodirbe642ff2016-06-09 15:51:51 -07001004 def __contains__(self, digest):
1005 with self._lock:
1006 return digest in self._contents
1007
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001008 def cached_set(self):
1009 with self._lock:
1010 return set(self._contents)
1011
maruel36a963d2016-04-08 17:15:49 -07001012 def cleanup(self):
1013 pass
1014
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015 def touch(self, digest, size):
1016 with self._lock:
1017 return digest in self._contents
1018
1019 def evict(self, digest):
1020 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001021 v = self._contents.pop(digest, None)
1022 if v is not None:
1023 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001024
tansell9e04a8d2016-07-28 09:31:59 -07001025 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001026 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001027 try:
tansell9e04a8d2016-07-28 09:31:59 -07001028 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001029 except KeyError:
1030 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001031 self._used.append(len(d))
1032 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001033
1034 def write(self, digest, content):
1035 # Assemble whole stream before taking the lock.
1036 data = ''.join(content)
1037 with self._lock:
1038 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001039 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001040 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001041
maruele6fc9382017-05-04 09:03:48 -07001042 def trim(self):
1043 """Trimming is not implemented for MemoryCache."""
1044 return 0
1045
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001046
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001047class CachePolicies(object):
1048 def __init__(self, max_cache_size, min_free_space, max_items):
1049 """
1050 Arguments:
1051 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1052 cache is effectively a leak.
1053 - min_free_space: Trim if disk free space becomes lower than this value. If
1054 0, it unconditionally fill the disk.
1055 - max_items: Maximum number of items to keep in the cache. If 0, do not
1056 enforce a limit.
1057 """
1058 self.max_cache_size = max_cache_size
1059 self.min_free_space = min_free_space
1060 self.max_items = max_items
1061
1062
1063class DiskCache(LocalCache):
1064 """Stateful LRU cache in a flat hash table in a directory.
1065
1066 Saves its state as json file.
1067 """
maruel12e30012015-10-09 11:55:35 -07001068 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001069
maruele6fc9382017-05-04 09:03:48 -07001070 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001071 """
1072 Arguments:
1073 cache_dir: directory where to place the cache.
1074 policies: cache retention policies.
1075 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001076 trim: if True to enforce |policies| right away.
1077 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001078 """
maruel064c0a32016-04-05 11:47:15 -07001079 # All protected methods (starting with '_') except _path should be called
1080 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001081 super(DiskCache, self).__init__()
1082 self.cache_dir = cache_dir
1083 self.policies = policies
1084 self.hash_algo = hash_algo
1085 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001086 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001087 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001088 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001089 file_path.ensure_tree(self.cache_dir)
1090 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001091 # The first item in the LRU cache that must not be evicted during this run
1092 # since it was referenced. All items more recent that _protected in the LRU
1093 # cache are also inherently protected. It could be a set() of all items
1094 # referenced but this increases memory usage without a use case.
1095 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001096 # Cleanup operations done by self._load(), if any.
1097 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001098 with tools.Profiler('Setup'):
1099 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001100 self._load(trim, time_fn)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001101
nodirbe642ff2016-06-09 15:51:51 -07001102 def __contains__(self, digest):
1103 with self._lock:
1104 return digest in self._lru
1105
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001106 def __enter__(self):
1107 return self
1108
1109 def __exit__(self, _exc_type, _exec_value, _traceback):
1110 with tools.Profiler('CleanupTrimming'):
1111 with self._lock:
1112 self._trim()
1113
1114 logging.info(
1115 '%5d (%8dkb) added',
1116 len(self._added), sum(self._added) / 1024)
1117 logging.info(
1118 '%5d (%8dkb) current',
1119 len(self._lru),
1120 sum(self._lru.itervalues()) / 1024)
1121 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001122 '%5d (%8dkb) evicted',
1123 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001124 logging.info(
1125 ' %8dkb free',
1126 self._free_disk / 1024)
1127 return False
1128
1129 def cached_set(self):
1130 with self._lock:
1131 return self._lru.keys_set()
1132
maruel36a963d2016-04-08 17:15:49 -07001133 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001134 """Cleans up the cache directory.
1135
1136 Ensures there is no unknown files in cache_dir.
1137 Ensures the read-only bits are set correctly.
1138
1139 At that point, the cache was already loaded, trimmed to respect cache
1140 policies.
1141 """
1142 fs.chmod(self.cache_dir, 0700)
1143 # Ensure that all files listed in the state still exist and add new ones.
1144 previous = self._lru.keys_set()
1145 # It'd be faster if there were a readdir() function.
1146 for filename in fs.listdir(self.cache_dir):
1147 if filename == self.STATE_FILE:
1148 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1149 continue
1150 if filename in previous:
1151 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1152 previous.remove(filename)
1153 continue
1154
1155 # An untracked file. Delete it.
1156 logging.warning('Removing unknown file %s from cache', filename)
1157 p = self._path(filename)
1158 if fs.isdir(p):
1159 try:
1160 file_path.rmtree(p)
1161 except OSError:
1162 pass
1163 else:
1164 file_path.try_remove(p)
1165 continue
1166
1167 if previous:
1168 # Filter out entries that were not found.
1169 logging.warning('Removed %d lost files', len(previous))
1170 for filename in previous:
1171 self._lru.pop(filename)
maruele6fc9382017-05-04 09:03:48 -07001172 self._save()
maruel36a963d2016-04-08 17:15:49 -07001173
1174 # What remains to be done is to hash every single item to
1175 # detect corruption, then save to ensure state.json is up to date.
1176 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1177 # TODO(maruel): Let's revisit once directory metadata is stored in
1178 # state.json so only the files that had been mapped since the last cleanup()
1179 # call are manually verified.
1180 #
1181 #with self._lock:
1182 # for digest in self._lru:
1183 # if not isolated_format.is_valid_hash(
1184 # self._path(digest), self.hash_algo):
1185 # self.evict(digest)
1186 # logging.info('Deleted corrupted item: %s', digest)
1187
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001188 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001189 """Verifies an actual file is valid and bumps its LRU position.
1190
1191 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1192 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001193
1194 Note that is doesn't compute the hash so it could still be corrupted if the
1195 file size didn't change.
1196
1197 TODO(maruel): More stringent verification while keeping the check fast.
1198 """
1199 # Do the check outside the lock.
1200 if not is_valid_file(self._path(digest), size):
1201 return False
1202
1203 # Update it's LRU position.
1204 with self._lock:
1205 if digest not in self._lru:
1206 return False
1207 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001208 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001209 return True
1210
1211 def evict(self, digest):
1212 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001213 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001214 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001215 self._lru.pop(digest)
1216 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1217
tansell9e04a8d2016-07-28 09:31:59 -07001218 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001219 try:
tansell9e04a8d2016-07-28 09:31:59 -07001220 f = fs.open(self._path(digest), 'rb')
1221 with self._lock:
1222 self._used.append(self._lru[digest])
1223 return f
nodir445097b2016-06-03 22:50:26 -07001224 except IOError:
1225 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001226
1227 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001228 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001229 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001230 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001231 path = self._path(digest)
1232 # A stale broken file may remain. It is possible for the file to have write
1233 # access bit removed which would cause the file_write() call to fail to open
1234 # in write mode. Take no chance here.
1235 file_path.try_remove(path)
1236 try:
1237 size = file_write(path, content)
1238 except:
1239 # There are two possible places were an exception can occur:
1240 # 1) Inside |content| generator in case of network or unzipping errors.
1241 # 2) Inside file_write itself in case of disk IO errors.
1242 # In any case delete an incomplete file and propagate the exception to
1243 # caller, it will be logged there.
1244 file_path.try_remove(path)
1245 raise
1246 # Make the file read-only in the cache. This has a few side-effects since
1247 # the file node is modified, so every directory entries to this file becomes
1248 # read-only. It's fine here because it is a new file.
1249 file_path.set_read_only(path, True)
1250 with self._lock:
1251 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001252 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001253
nodirf33b8d62016-10-26 22:34:58 -07001254 def get_oldest(self):
1255 """Returns digest of the LRU item or None."""
1256 try:
1257 return self._lru.get_oldest()[0]
1258 except KeyError:
1259 return None
1260
1261 def get_timestamp(self, digest):
1262 """Returns timestamp of last use of an item.
1263
1264 Raises KeyError if item is not found.
1265 """
1266 return self._lru.get_timestamp(digest)
1267
1268 def trim(self):
1269 """Forces retention policies."""
1270 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001271 return self._trim()
nodirf33b8d62016-10-26 22:34:58 -07001272
maruele6fc9382017-05-04 09:03:48 -07001273 def _load(self, trim, time_fn):
maruel2e8d0f52016-07-16 07:51:29 -07001274 """Loads state of the cache from json file.
1275
1276 If cache_dir does not exist on disk, it is created.
1277 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001278 self._lock.assert_locked()
1279
maruel2e8d0f52016-07-16 07:51:29 -07001280 if not fs.isfile(self.state_file):
1281 if not os.path.isdir(self.cache_dir):
1282 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001283 else:
maruel2e8d0f52016-07-16 07:51:29 -07001284 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001285 try:
1286 self._lru = lru.LRUDict.load(self.state_file)
1287 except ValueError as err:
1288 logging.error('Failed to load cache state: %s' % (err,))
1289 # Don't want to keep broken state file.
1290 file_path.try_remove(self.state_file)
maruele6fc9382017-05-04 09:03:48 -07001291 if time_fn:
1292 self._lru.time_fn = time_fn
nodirf33b8d62016-10-26 22:34:58 -07001293 if trim:
1294 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001295 # We want the initial cache size after trimming, i.e. what is readily
1296 # avaiable.
1297 self._initial_number_items = len(self._lru)
1298 self._initial_size = sum(self._lru.itervalues())
1299 if self._evicted:
1300 logging.info(
1301 'Trimming evicted items with the following sizes: %s',
1302 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001303
1304 def _save(self):
1305 """Saves the LRU ordering."""
1306 self._lock.assert_locked()
1307 if sys.platform != 'win32':
1308 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001309 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001310 # Necessary otherwise the file can't be created.
1311 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001312 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001313 file_path.set_read_only(self.state_file, False)
1314 self._lru.save(self.state_file)
1315
1316 def _trim(self):
1317 """Trims anything we don't know, make sure enough free space exists."""
1318 self._lock.assert_locked()
1319
1320 # Ensure maximum cache size.
1321 if self.policies.max_cache_size:
1322 total_size = sum(self._lru.itervalues())
1323 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001324 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001325
1326 # Ensure maximum number of items in the cache.
1327 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1328 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001329 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001330
1331 # Ensure enough free space.
1332 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001333 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001334 while (
1335 self.policies.min_free_space and
1336 self._lru and
1337 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001338 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001339 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001340
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001341 if trimmed_due_to_space:
1342 total_usage = sum(self._lru.itervalues())
1343 usage_percent = 0.
1344 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001345 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1346
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001347 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001348 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1349 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1350 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001351 self._free_disk / 1024.,
1352 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001353 usage_percent,
1354 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001355 self._save()
maruele6fc9382017-05-04 09:03:48 -07001356 return trimmed_due_to_space
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001357
1358 def _path(self, digest):
1359 """Returns the path to one item."""
1360 return os.path.join(self.cache_dir, digest)
1361
maruel2e8d0f52016-07-16 07:51:29 -07001362 def _remove_lru_file(self, allow_protected):
1363 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001364 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001365 try:
nodireabc11c2016-10-18 16:37:28 -07001366 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001367 if not allow_protected and digest == self._protected:
maruele6fc9382017-05-04 09:03:48 -07001368 raise Error(
1369 'Not enough space to fetch the whole isolated tree; %sb free, min '
1370 'is %sb' % (self._free_disk, self.policies.min_free_space))
maruel083fa552016-04-08 14:38:01 -07001371 except KeyError:
1372 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001373 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001374 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001375 self._delete_file(digest, size)
1376 return size
1377
1378 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1379 """Adds an item into LRU cache marking it as a newest one."""
1380 self._lock.assert_locked()
1381 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001382 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001383 self._added.append(size)
1384 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001385 self._free_disk -= size
1386 # Do a quicker version of self._trim(). It only enforces free disk space,
1387 # not cache size limits. It doesn't actually look at real free disk space,
1388 # only uses its cache values. self._trim() will be called later to enforce
1389 # real trimming but doing this quick version here makes it possible to map
1390 # an isolated that is larger than the current amount of free disk space when
1391 # the cache size is already large.
1392 while (
1393 self.policies.min_free_space and
1394 self._lru and
1395 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001396 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001397
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001398 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1399 """Deletes cache file from the file system."""
1400 self._lock.assert_locked()
1401 try:
1402 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001403 try:
1404 size = fs.stat(self._path(digest)).st_size
1405 except OSError:
1406 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001407 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001408 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001409 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001410 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001411 if e.errno != errno.ENOENT:
1412 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001413
1414
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001415class IsolatedBundle(object):
1416 """Fetched and parsed .isolated file with all dependencies."""
1417
Vadim Shtayura3148e072014-09-02 18:51:52 -07001418 def __init__(self):
1419 self.command = []
1420 self.files = {}
1421 self.read_only = None
1422 self.relative_cwd = None
1423 # The main .isolated file, a IsolatedFile instance.
1424 self.root = None
1425
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001426 def fetch(self, fetch_queue, root_isolated_hash, algo):
1427 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001428
1429 It enables support for "included" .isolated files. They are processed in
1430 strict order but fetched asynchronously from the cache. This is important so
1431 that a file in an included .isolated file that is overridden by an embedding
1432 .isolated file is not fetched needlessly. The includes are fetched in one
1433 pass and the files are fetched as soon as all the ones on the left-side
1434 of the tree were fetched.
1435
1436 The prioritization is very important here for nested .isolated files.
1437 'includes' have the highest priority and the algorithm is optimized for both
1438 deep and wide trees. A deep one is a long link of .isolated files referenced
1439 one at a time by one item in 'includes'. A wide one has a large number of
1440 'includes' in a single .isolated file. 'left' is defined as an included
1441 .isolated file earlier in the 'includes' list. So the order of the elements
1442 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001443
1444 As a side effect this method starts asynchronous fetch of all data files
1445 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1446 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001447 """
1448 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1449
1450 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1451 pending = {}
1452 # Set of hashes of already retrieved items to refuse recursive includes.
1453 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001454 # Set of IsolatedFile's whose data files have already being fetched.
1455 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001456
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001457 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001458 h = isolated_file.obj_hash
1459 if h in seen:
1460 raise isolated_format.IsolatedError(
1461 'IsolatedFile %s is retrieved recursively' % h)
1462 assert h not in pending
1463 seen.add(h)
1464 pending[h] = isolated_file
1465 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1466
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001467 # Start fetching root *.isolated file (single file, not the whole bundle).
1468 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001469
1470 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001471 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001472 item_hash = fetch_queue.wait(pending)
1473 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001474 with fetch_queue.cache.getfileobj(item_hash) as f:
1475 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001476
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001477 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001478 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001479 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001480
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001481 # Always fetch *.isolated files in traversal order, waiting if necessary
1482 # until next to-be-processed node loads. "Waiting" is done by yielding
1483 # back to the outer loop, that waits until some *.isolated is loaded.
1484 for node in isolated_format.walk_includes(self.root):
1485 if node not in processed:
1486 # Not visited, and not yet loaded -> wait for it to load.
1487 if not node.is_loaded:
1488 break
1489 # Not visited and loaded -> process it and continue the traversal.
1490 self._start_fetching_files(node, fetch_queue)
1491 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001492
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001493 # All *.isolated files should be processed by now and only them.
1494 all_isolateds = set(isolated_format.walk_includes(self.root))
1495 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001496
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001497 # Extract 'command' and other bundle properties.
1498 for node in isolated_format.walk_includes(self.root):
1499 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001500 self.relative_cwd = self.relative_cwd or ''
1501
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001502 def _start_fetching_files(self, isolated, fetch_queue):
1503 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001504
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001505 Modifies self.files.
1506 """
maruel10bea7b2016-12-07 05:03:49 -08001507 files = isolated.data.get('files', {})
1508 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1509 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001510 # Root isolated has priority on the files being mapped. In particular,
1511 # overridden files must not be fetched.
1512 if filepath not in self.files:
1513 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001514
1515 # Make sure if the isolated is read only, the mode doesn't have write
1516 # bits.
1517 if 'm' in properties and self.read_only:
1518 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1519
1520 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001521 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001522 fetch_queue.add(
1523 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1524
1525 def _update_self(self, node):
1526 """Extracts bundle global parameters from loaded *.isolated file.
1527
1528 Will be called with each loaded *.isolated file in order of traversal of
1529 isolated include graph (see isolated_format.walk_includes).
1530 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001531 # Grabs properties.
1532 if not self.command and node.data.get('command'):
1533 # Ensure paths are correctly separated on windows.
1534 self.command = node.data['command']
1535 if self.command:
1536 self.command[0] = self.command[0].replace('/', os.path.sep)
1537 self.command = tools.fix_python_path(self.command)
1538 if self.read_only is None and node.data.get('read_only') is not None:
1539 self.read_only = node.data['read_only']
1540 if (self.relative_cwd is None and
1541 node.data.get('relative_cwd') is not None):
1542 self.relative_cwd = node.data['relative_cwd']
1543
1544
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001545def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001546 """Returns Storage class that can upload and download from |namespace|.
1547
1548 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001549 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001550 namespace: isolate namespace to operate in, also defines hashing and
1551 compression scheme used, i.e. namespace names that end with '-gzip'
1552 store compressed data.
1553
1554 Returns:
1555 Instance of Storage.
1556 """
aludwin81178302016-11-30 17:18:49 -08001557 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001558
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001559
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001560def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001561 """Uploads the given tree to the given url.
1562
1563 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001564 base_url: The url of the isolate server to upload to.
1565 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001566 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001567 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001568 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001569 # Filter out symlinks, since they are not represented by items on isolate
1570 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001571 items = []
1572 seen = set()
1573 skipped = 0
1574 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001575 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001576 if 'l' not in metadata and filepath not in seen:
1577 seen.add(filepath)
1578 item = FileItem(
1579 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001580 digest=metadata['h'],
1581 size=metadata['s'],
1582 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001583 items.append(item)
1584 else:
1585 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001586
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001587 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001588 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001589 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001590
1591
maruel4409e302016-07-19 14:25:51 -07001592def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001593 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001594
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001595 Arguments:
1596 isolated_hash: hash of the root *.isolated file.
1597 storage: Storage class that communicates with isolate storage.
1598 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001599 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001600 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001601
1602 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001603 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001604 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001605 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001606 'fetch_isolated(%s, %s, %s, %s, %s)',
1607 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001608 # Hash algorithm to use, defined by namespace |storage| is using.
1609 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001610 with cache:
1611 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001612 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001613
1614 with tools.Profiler('GetIsolateds'):
1615 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001616 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001617 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001618 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001619 try:
maruel1ceb3872015-10-14 06:10:44 -07001620 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001621 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001622 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001623 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1624 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001625
1626 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001627 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001628
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001629 with tools.Profiler('GetRest'):
1630 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001631 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001632 create_directories(outdir, bundle.files)
1633 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001634
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001635 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001636 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001637 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001638
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001639 # Multimap: digest -> list of pairs (path, props).
1640 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001641 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001642 if 'h' in props:
1643 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001644
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001645 # Now block on the remaining files to be downloaded and mapped.
1646 logging.info('Retrieving remaining files (%d of them)...',
1647 fetch_queue.pending_count)
1648 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001649 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001650 while remaining:
1651 detector.ping()
1652
1653 # Wait for any item to finish fetching to cache.
1654 digest = fetch_queue.wait(remaining)
1655
tansell9e04a8d2016-07-28 09:31:59 -07001656 # Create the files in the destination using item in cache as the
1657 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001658 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001659 fullpath = os.path.join(outdir, filepath)
1660
1661 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001662 filetype = props.get('t', 'basic')
1663
1664 if filetype == 'basic':
1665 file_mode = props.get('m')
1666 if file_mode:
1667 # Ignore all bits apart from the user
1668 file_mode &= 0700
1669 putfile(
1670 srcfileobj, fullpath, file_mode,
1671 use_symlink=use_symlinks)
1672
tansell26de79e2016-11-13 18:41:11 -08001673 elif filetype == 'tar':
1674 basedir = os.path.dirname(fullpath)
1675 with tarfile.TarFile(fileobj=srcfileobj) as extractor:
1676 for ti in extractor:
1677 if not ti.isfile():
1678 logging.warning(
1679 'Path(%r) is nonfile (%s), skipped',
1680 ti.name, ti.type)
1681 continue
1682 fp = os.path.normpath(os.path.join(basedir, ti.name))
1683 if not fp.startswith(basedir):
1684 logging.error(
1685 'Path(%r) is outside root directory',
1686 fp)
1687 ifd = extractor.extractfile(ti)
1688 file_path.ensure_tree(os.path.dirname(fp))
1689 putfile(ifd, fp, 0700, ti.size)
1690
tanselle4288c32016-07-28 09:45:40 -07001691 elif filetype == 'ar':
1692 basedir = os.path.dirname(fullpath)
1693 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
1694 for ai, ifd in extractor:
1695 fp = os.path.normpath(os.path.join(basedir, ai.name))
tansell26de79e2016-11-13 18:41:11 -08001696 if not fp.startswith(basedir):
1697 logging.error(
1698 'Path(%r) is outside root directory',
1699 fp)
tanselle4288c32016-07-28 09:45:40 -07001700 file_path.ensure_tree(os.path.dirname(fp))
1701 putfile(ifd, fp, 0700, ai.size)
1702
1703 else:
1704 raise isolated_format.IsolatedError(
1705 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001706
1707 # Report progress.
1708 duration = time.time() - last_update
1709 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1710 msg = '%d files remaining...' % len(remaining)
1711 print msg
1712 logging.info(msg)
1713 last_update = time.time()
1714
1715 # Cache could evict some items we just tried to fetch, it's a fatal error.
1716 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001717 raise isolated_format.MappingError(
1718 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001719 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001720
1721
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001722def directory_to_metadata(root, algo, blacklist):
1723 """Returns the FileItem list and .isolated metadata for a directory."""
1724 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001725 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001726 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001727 metadata = {
1728 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001729 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001730 for relpath in paths
1731 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001732 for v in metadata.itervalues():
1733 v.pop('t')
1734 items = [
1735 FileItem(
1736 path=os.path.join(root, relpath),
1737 digest=meta['h'],
1738 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001739 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001740 for relpath, meta in metadata.iteritems() if 'h' in meta
1741 ]
1742 return items, metadata
1743
1744
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001745def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001746 """Stores every entries and returns the relevant data.
1747
1748 Arguments:
1749 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001750 files: list of file paths to upload. If a directory is specified, a
1751 .isolated file is created and its hash is returned.
1752 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001753
1754 Returns:
1755 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1756 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001757 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001758 assert all(isinstance(i, unicode) for i in files), files
1759 if len(files) != len(set(map(os.path.abspath, files))):
1760 raise Error('Duplicate entries found.')
1761
maruel064c0a32016-04-05 11:47:15 -07001762 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001763 results = []
1764 # The temporary directory is only created as needed.
1765 tempdir = None
1766 try:
1767 # TODO(maruel): Yield the files to a worker thread.
1768 items_to_upload = []
1769 for f in files:
1770 try:
1771 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001772 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001773 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001774 items, metadata = directory_to_metadata(
1775 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001776
1777 # Create the .isolated file.
1778 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001779 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1780 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001781 os.close(handle)
1782 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001783 'algo':
1784 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001785 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001786 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001787 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001788 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001789 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001790 items_to_upload.extend(items)
1791 items_to_upload.append(
1792 FileItem(
1793 path=isolated,
1794 digest=h,
maruel12e30012015-10-09 11:55:35 -07001795 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001796 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001797 results.append((h, f))
1798
maruel12e30012015-10-09 11:55:35 -07001799 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001800 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001801 items_to_upload.append(
1802 FileItem(
1803 path=filepath,
1804 digest=h,
maruel12e30012015-10-09 11:55:35 -07001805 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001806 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001807 results.append((h, f))
1808 else:
1809 raise Error('%s is neither a file or directory.' % f)
1810 except OSError:
1811 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001812 uploaded = storage.upload_items(items_to_upload)
1813 cold = [i for i in items_to_upload if i in uploaded]
1814 hot = [i for i in items_to_upload if i not in uploaded]
1815 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001816 finally:
maruel12e30012015-10-09 11:55:35 -07001817 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001818 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001819
1820
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001821def archive(out, namespace, files, blacklist):
1822 if files == ['-']:
1823 files = sys.stdin.readlines()
1824
1825 if not files:
1826 raise Error('Nothing to upload')
1827
1828 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001829 blacklist = tools.gen_blacklist(blacklist)
1830 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001831 # Ignore stats.
1832 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001833 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1834
1835
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001836@subcommand.usage('<file1..fileN> or - to read from stdin')
1837def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001838 """Archives data to the server.
1839
1840 If a directory is specified, a .isolated file is created the whole directory
1841 is uploaded. Then this .isolated file can be included in another one to run
1842 commands.
1843
1844 The commands output each file that was processed with its content hash. For
1845 directories, the .isolated generated for the directory is listed as the
1846 directory entry itself.
1847 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001848 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001849 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001850 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001851 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001852 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001853 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001854 except Error as e:
1855 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001856 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001857
1858
1859def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001860 """Download data from the server.
1861
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001862 It can either download individual files or a complete tree from a .isolated
1863 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001864 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001865 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001866 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001867 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001868 help='hash of an isolated file, .isolated file content is discarded, use '
1869 '--file if you need it')
1870 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001871 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1872 help='hash and destination of a file, can be used multiple times')
1873 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001874 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001875 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001876 parser.add_option(
1877 '--use-symlinks', action='store_true',
1878 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001879 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001880 options, args = parser.parse_args(args)
1881 if args:
1882 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001883
nodir55be77b2016-05-03 09:39:57 -07001884 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001885 if bool(options.isolated) == bool(options.file):
1886 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001887 if not options.cache and options.use_symlinks:
1888 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001889
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001890 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001891 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001892 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001893 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001894 if (fs.isfile(options.target) or
1895 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001896 parser.error(
1897 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001898 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001899 # Fetching individual files.
1900 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001901 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001902 channel = threading_utils.TaskChannel()
1903 pending = {}
1904 for digest, dest in options.file:
1905 pending[digest] = dest
1906 storage.async_fetch(
1907 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001908 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001909 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001910 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001911 functools.partial(file_write, os.path.join(options.target, dest)))
1912 while pending:
1913 fetched = channel.pull()
1914 dest = pending.pop(fetched)
1915 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001916
Vadim Shtayura3172be52013-12-03 12:49:05 -08001917 # Fetching whole isolated tree.
1918 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001919 with cache:
1920 bundle = fetch_isolated(
1921 isolated_hash=options.isolated,
1922 storage=storage,
1923 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001924 outdir=options.target,
1925 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001926 if bundle.command:
1927 rel = os.path.join(options.target, bundle.relative_cwd)
1928 print('To run this test please run from the directory %s:' %
1929 os.path.join(options.target, rel))
1930 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001931
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001932 return 0
1933
1934
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001935def add_archive_options(parser):
1936 parser.add_option(
1937 '--blacklist',
1938 action='append', default=list(DEFAULT_BLACKLIST),
1939 help='List of regexp to use as blacklist filter when uploading '
1940 'directories')
1941
1942
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001943def add_isolate_server_options(parser):
1944 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001945 parser.add_option(
1946 '-I', '--isolate-server',
1947 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001948 help='URL of the Isolate Server to use. Defaults to the environment '
1949 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1950 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001951 parser.add_option(
aludwin81178302016-11-30 17:18:49 -08001952 '--is-grpc', action='store_true', help='Communicate to Isolate via gRPC')
1953 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001954 '--namespace', default='default-gzip',
1955 help='The namespace to use on the Isolate Server, default: %default')
1956
1957
nodir55be77b2016-05-03 09:39:57 -07001958def process_isolate_server_options(
1959 parser, options, set_exception_handler, required):
1960 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001961
1962 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001963 """
1964 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001965 if required:
1966 parser.error('--isolate-server is required.')
1967 return
1968
aludwin4972e9b2017-06-28 13:05:58 -07001969 if 'ISOLATED_GRPC_PROXY' in os.environ:
1970 options.is_grpc = True
1971
aludwin81178302016-11-30 17:18:49 -08001972 if options.is_grpc:
1973 isolate_storage.set_storage_api_class(isolate_storage.IsolateServerGrpc)
1974 else:
1975 try:
1976 options.isolate_server = net.fix_url(options.isolate_server)
1977 except ValueError as e:
1978 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001979 if set_exception_handler:
1980 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001981 try:
1982 return auth.ensure_logged_in(options.isolate_server)
1983 except ValueError as e:
1984 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001985
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001986
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001987def add_cache_options(parser):
1988 cache_group = optparse.OptionGroup(parser, 'Cache management')
1989 cache_group.add_option(
1990 '--cache', metavar='DIR',
1991 help='Directory to keep a local cache of the files. Accelerates download '
1992 'by reusing already downloaded files. Default=%default')
1993 cache_group.add_option(
1994 '--max-cache-size',
1995 type='int',
1996 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001997 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001998 help='Trim if the cache gets larger than this value, default=%default')
1999 cache_group.add_option(
2000 '--min-free-space',
2001 type='int',
2002 metavar='NNN',
2003 default=2*1024*1024*1024,
2004 help='Trim if disk free space becomes lower than this value, '
2005 'default=%default')
2006 cache_group.add_option(
2007 '--max-items',
2008 type='int',
2009 metavar='NNN',
2010 default=100000,
2011 help='Trim if more than this number of items are in the cache '
2012 'default=%default')
2013 parser.add_option_group(cache_group)
2014
2015
maruele6fc9382017-05-04 09:03:48 -07002016def process_cache_options(options, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002017 if options.cache:
2018 policies = CachePolicies(
2019 options.max_cache_size, options.min_free_space, options.max_items)
2020
2021 # |options.cache| path may not exist until DiskCache() instance is created.
2022 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002023 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002024 policies,
nodirf33b8d62016-10-26 22:34:58 -07002025 isolated_format.get_hash_algo(options.namespace),
maruele6fc9382017-05-04 09:03:48 -07002026 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002027 else:
2028 return MemoryCache()
2029
2030
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002031class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002032 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002033 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002034 self,
2035 version=__version__,
2036 prog=os.path.basename(sys.modules[__name__].__file__),
2037 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002038 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002039
2040 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002041 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002042 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002043 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002044 return options, args
2045
2046
2047def main(args):
2048 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002049 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002050
2051
2052if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002053 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002054 fix_encoding.fix_encoding()
2055 tools.disable_buffering()
2056 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002057 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002058 sys.exit(main(sys.argv[1:]))