blob: 6bd17c8e3b49e03ac7747b15e41fba22fbddcbbb [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04008__version__ = '0.8.1'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
maruel12e30012015-10-09 11:55:35 -070030from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040031from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040032from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000033from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040034from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070035from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
aludwin81178302016-11-30 17:18:49 -080041import isolate_storage
42from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080043
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000044
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045# Version of isolate protocol passed to the server in /handshake request.
46ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048
Vadim Shtayura3148e072014-09-02 18:51:52 -070049# The file size to be used when we don't know the correct file size,
50# generally used for .isolated files.
51UNKNOWN_FILE_SIZE = None
52
53
54# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050088DEFAULT_BLACKLIST = (
89 # Temporary vim or python files.
90 r'^.+\.(?:pyc|swp)$',
91 # .git or .svn directory.
92 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
93)
94
95
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050096class Error(Exception):
97 """Generic runtime error."""
98 pass
99
100
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400101class Aborted(Error):
102 """Operation aborted."""
103 pass
104
105
nodir90bc8dc2016-06-15 13:35:21 -0700106class AlreadyExists(Error):
107 """File already exists."""
108
109
maruel12e30012015-10-09 11:55:35 -0700110def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700112 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800113 if offset:
114 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000116 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 if not data:
118 break
119 yield data
120
121
maruel12e30012015-10-09 11:55:35 -0700122def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000123 """Writes file content as generated by content_generator.
124
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 Creates the intermediary directory as needed.
126
127 Returns the number of bytes written.
128
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000129 Meant to be mocked out in unit tests.
130 """
nodire5028a92016-04-29 14:38:21 -0700131 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 total = 0
maruel12e30012015-10-09 11:55:35 -0700133 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000134 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000135 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000137 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138
139
tansell9e04a8d2016-07-28 09:31:59 -0700140def fileobj_path(fileobj):
141 """Return file system path for file like object or None.
142
143 The returned path is guaranteed to exist and can be passed to file system
144 operations like copy.
145 """
146 name = getattr(fileobj, 'name', None)
147 if name is None:
148 return
149
150 # If the file like object was created using something like open("test.txt")
151 # name will end up being a str (such as a function outside our control, like
152 # the standard library). We want all our paths to be unicode objects, so we
153 # decode it.
154 if not isinstance(name, unicode):
155 name = name.decode(sys.getfilesystemencoding())
156
tansell26de79e2016-11-13 18:41:11 -0800157 # fs.exists requires an absolute path, otherwise it will fail with an
158 # assertion error.
159 if not os.path.isabs(name):
160 return
161
tansell9e04a8d2016-07-28 09:31:59 -0700162 if fs.exists(name):
163 return name
164
165
166# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
167# wrappers have been created.
168def fileobj_copy(
169 dstfileobj, srcfileobj, size=-1,
170 chunk_size=isolated_format.DISK_FILE_CHUNK):
171 """Copy data from srcfileobj to dstfileobj.
172
173 Providing size means exactly that amount of data will be copied (if there
174 isn't enough data, an IOError exception is thrown). Otherwise all data until
175 the EOF marker will be copied.
176 """
177 if size == -1 and hasattr(srcfileobj, 'tell'):
178 if srcfileobj.tell() != 0:
179 raise IOError('partial file but not using size')
180
181 written = 0
182 while written != size:
183 readsize = chunk_size
184 if size > 0:
185 readsize = min(readsize, size-written)
186 data = srcfileobj.read(readsize)
187 if not data:
188 if size == -1:
189 break
190 raise IOError('partial file, got %s, wanted %s' % (written, size))
191 dstfileobj.write(data)
192 written += len(data)
193
194
195def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
196 """Put srcfileobj at the given dstpath with given mode.
197
198 The function aims to do this as efficiently as possible while still allowing
199 any possible file like object be given.
200
201 Creating a tree of hardlinks has a few drawbacks:
202 - tmpfs cannot be used for the scratch space. The tree has to be on the same
203 partition as the cache.
204 - involves a write to the inode, which advances ctime, cause a metadata
205 writeback (causing disk seeking).
206 - cache ctime cannot be used to detect modifications / corruption.
207 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
208 partition. This is why the function automatically fallbacks to copying the
209 file content.
210 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
211 same owner is for all hardlinks.
212 - Anecdotal report that ext2 is known to be potentially faulty on high rate
213 of hardlink creation.
214
215 Creating a tree of symlinks has a few drawbacks:
216 - Tasks running the equivalent of os.path.realpath() will get the naked path
217 and may fail.
218 - Windows:
219 - Symlinks are reparse points:
220 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
221 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
222 - Symbolic links are Win32 paths, not NT paths.
223 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
224 - Symbolic links are supported on Windows 7 and later only.
225 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
226 default.
227 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
228 RID is present in the token;
229 https://msdn.microsoft.com/en-us/library/bb530410.aspx
230 """
231 srcpath = fileobj_path(srcfileobj)
232 if srcpath and size == -1:
233 readonly = file_mode is None or (
234 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
235
236 if readonly:
237 # If the file is read only we can link the file
238 if use_symlink:
239 link_mode = file_path.SYMLINK_WITH_FALLBACK
240 else:
241 link_mode = file_path.HARDLINK_WITH_FALLBACK
242 else:
243 # If not read only, we must copy the file
244 link_mode = file_path.COPY
245
246 file_path.link_file(dstpath, srcpath, link_mode)
247 else:
248 # Need to write out the file
249 with fs.open(dstpath, 'wb') as dstfileobj:
250 fileobj_copy(dstfileobj, srcfileobj, size)
251
252 assert fs.exists(dstpath)
253
254 # file_mode of 0 is actually valid, so need explicit check.
255 if file_mode is not None:
256 fs.chmod(dstpath, file_mode)
257
258
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000259def zip_compress(content_generator, level=7):
260 """Reads chunks from |content_generator| and yields zip compressed chunks."""
261 compressor = zlib.compressobj(level)
262 for chunk in content_generator:
263 compressed = compressor.compress(chunk)
264 if compressed:
265 yield compressed
266 tail = compressor.flush(zlib.Z_FINISH)
267 if tail:
268 yield tail
269
270
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400271def zip_decompress(
272 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000273 """Reads zipped data from |content_generator| and yields decompressed data.
274
275 Decompresses data in small chunks (no larger than |chunk_size|) so that
276 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
277
278 Raises IOError if data is corrupted or incomplete.
279 """
280 decompressor = zlib.decompressobj()
281 compressed_size = 0
282 try:
283 for chunk in content_generator:
284 compressed_size += len(chunk)
285 data = decompressor.decompress(chunk, chunk_size)
286 if data:
287 yield data
288 while decompressor.unconsumed_tail:
289 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
290 if data:
291 yield data
292 tail = decompressor.flush()
293 if tail:
294 yield tail
295 except zlib.error as e:
296 raise IOError(
297 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
298 # Ensure all data was read and decompressed.
299 if decompressor.unused_data or decompressor.unconsumed_tail:
300 raise IOError('Not all data was decompressed')
301
302
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000303def get_zip_compression_level(filename):
304 """Given a filename calculates the ideal zip compression level to use."""
305 file_ext = os.path.splitext(filename)[1].lower()
306 # TODO(csharp): Profile to find what compression level works best.
307 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
308
309
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000310def create_directories(base_directory, files):
311 """Creates the directory structure needed by the given list of files."""
312 logging.debug('create_directories(%s, %d)', base_directory, len(files))
313 # Creates the tree of directories to create.
314 directories = set(os.path.dirname(f) for f in files)
315 for item in list(directories):
316 while item:
317 directories.add(item)
318 item = os.path.dirname(item)
319 for d in sorted(directories):
320 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700321 abs_d = os.path.join(base_directory, d)
322 if not fs.isdir(abs_d):
323 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000324
325
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500326def create_symlinks(base_directory, files):
327 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000328 for filepath, properties in files:
329 if 'l' not in properties:
330 continue
331 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500332 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000333 logging.warning('Ignoring symlink %s', filepath)
334 continue
335 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700336 try:
337 os.symlink(properties['l'], outfile) # pylint: disable=E1101
338 except OSError as e:
339 if e.errno == errno.EEXIST:
340 raise AlreadyExists('File %s already exists.' % outfile)
341 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000342
343
maruel12e30012015-10-09 11:55:35 -0700344def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000345 """Determines if the given files appears valid.
346
vadimsh129e5942017-01-04 16:42:46 -0800347 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000348 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700349 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700350 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800351 try:
352 actual_size = fs.stat(path).st_size
353 except OSError as e:
354 logging.warning(
355 'Can\'t read item %s, assuming it\'s invalid: %s',
356 os.path.basename(path), e)
357 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000358 if size != actual_size:
359 logging.warning(
360 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700361 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000362 return False
363 return True
364
365
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000366class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800367 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000368
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800369 Its digest and size may be provided in advance, if known. Otherwise they will
370 be derived from the file content.
371 """
372
373 def __init__(self, path, digest=None, size=None, high_priority=False):
374 super(FileItem, self).__init__(
375 digest,
maruel12e30012015-10-09 11:55:35 -0700376 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800377 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000378 self.path = path
379 self.compression_level = get_zip_compression_level(path)
380
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800381 def content(self):
382 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000383
384
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000385class BufferItem(Item):
386 """A byte buffer to push to Storage."""
387
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800388 def __init__(self, buf, high_priority=False):
389 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000390 self.buffer = buf
391
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800392 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000393 return [self.buffer]
394
395
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000396class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800397 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000398
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800399 Implements compression support, parallel 'contains' checks, parallel uploads
400 and more.
401
402 Works only within single namespace (and thus hashing algorithm and compression
403 scheme are fixed).
404
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400405 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
406 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800407 """
408
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700409 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000410 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400411 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700412 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400413 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000414 self._cpu_thread_pool = None
415 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400416 self._aborted = False
417 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000418
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000419 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700420 def hash_algo(self):
421 """Hashing algorithm used to name files in storage based on their content.
422
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400423 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700424 """
425 return self._hash_algo
426
427 @property
428 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500429 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700430 return self._storage_api.location
431
432 @property
433 def namespace(self):
434 """Isolate namespace used by this storage.
435
436 Indirectly defines hashing scheme and compression method used.
437 """
438 return self._storage_api.namespace
439
440 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000441 def cpu_thread_pool(self):
442 """ThreadPool for CPU-bound tasks like zipping."""
443 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500444 threads = max(threading_utils.num_processors(), 2)
445 if sys.maxsize <= 2L**32:
446 # On 32 bits userland, do not try to use more than 16 threads.
447 threads = min(threads, 16)
448 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000449 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000450
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000451 @property
452 def net_thread_pool(self):
453 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
454 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700455 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000456 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000457
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000458 def close(self):
459 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400460 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 if self._cpu_thread_pool:
462 self._cpu_thread_pool.join()
463 self._cpu_thread_pool.close()
464 self._cpu_thread_pool = None
465 if self._net_thread_pool:
466 self._net_thread_pool.join()
467 self._net_thread_pool.close()
468 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400469 logging.info('Done.')
470
471 def abort(self):
472 """Cancels any pending or future operations."""
473 # This is not strictly theadsafe, but in the worst case the logging message
474 # will be printed twice. Not a big deal. In other places it is assumed that
475 # unprotected reads and writes to _aborted are serializable (it is true
476 # for python) and thus no locking is used.
477 if not self._aborted:
478 logging.warning('Aborting... It can take a while.')
479 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000480
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000481 def __enter__(self):
482 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400483 assert not self._prev_sig_handlers, self._prev_sig_handlers
484 for s in (signal.SIGINT, signal.SIGTERM):
485 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000486 return self
487
488 def __exit__(self, _exc_type, _exc_value, _traceback):
489 """Context manager interface."""
490 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400491 while self._prev_sig_handlers:
492 s, h = self._prev_sig_handlers.popitem()
493 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000494 return False
495
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800497 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000498
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800499 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000500
501 Arguments:
502 items: list of Item instances that represents data to upload.
503
504 Returns:
505 List of items that were uploaded. All other items are already there.
506 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700507 logging.info('upload_items(items=%d)', len(items))
508
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800509 # Ensure all digests are calculated.
510 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700511 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800512
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000513 # For each digest keep only first Item that matches it. All other items
514 # are just indistinguishable copies from the point of view of isolate
515 # server (it doesn't care about paths at all, only content and digests).
516 seen = {}
517 duplicates = 0
518 for item in items:
519 if seen.setdefault(item.digest, item) is not item:
520 duplicates += 1
521 items = seen.values()
522 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700523 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000524
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000525 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000526 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000527 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800528 channel = threading_utils.TaskChannel()
529 for missing_item, push_state in self.get_missing_items(items):
530 missing.add(missing_item)
531 self.async_push(channel, missing_item, push_state)
532
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000533 # No need to spawn deadlock detector thread if there's nothing to upload.
534 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700535 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000536 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000537 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000538 detector.ping()
539 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000540 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000541 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000542 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000543 logging.info('All files are uploaded')
544
545 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000546 total = len(items)
547 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000548 logging.info(
549 'Total: %6d, %9.1fkb',
550 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000551 total_size / 1024.)
552 cache_hit = set(items) - missing
553 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 logging.info(
555 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
556 len(cache_hit),
557 cache_hit_size / 1024.,
558 len(cache_hit) * 100. / total,
559 cache_hit_size * 100. / total_size if total_size else 0)
560 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000561 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000562 logging.info(
563 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
564 len(cache_miss),
565 cache_miss_size / 1024.,
566 len(cache_miss) * 100. / total,
567 cache_miss_size * 100. / total_size if total_size else 0)
568
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000569 return uploaded
570
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800571 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000572 """Starts asynchronous push to the server in a parallel thread.
573
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800574 Can be used only after |item| was checked for presence on a server with
575 'get_missing_items' call. 'get_missing_items' returns |push_state| object
576 that contains storage specific information describing how to upload
577 the item (for example in case of cloud storage, it is signed upload URLs).
578
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000579 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000580 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000581 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800582 push_state: push state returned by 'get_missing_items' call for |item|.
583
584 Returns:
585 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000586 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800587 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400588 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700589 threading_utils.PRIORITY_HIGH if item.high_priority
590 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800591
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000592 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400593 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400594 if self._aborted:
595 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700596 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800597 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000598 return item
599
600 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700601 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800602 self.net_thread_pool.add_task_with_channel(
603 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000604 return
605
606 # If zipping is enabled, zip in a separate thread.
607 def zip_and_push():
608 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
609 # content right here. It will block until all file is zipped.
610 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400611 if self._aborted:
612 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800613 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000614 data = ''.join(stream)
615 except Exception as exc:
616 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800617 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000618 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000619 self.net_thread_pool.add_task_with_channel(
620 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000621 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000622
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800623 def push(self, item, push_state):
624 """Synchronously pushes a single item to the server.
625
626 If you need to push many items at once, consider using 'upload_items' or
627 'async_push' with instance of TaskChannel.
628
629 Arguments:
630 item: item to upload as instance of Item class.
631 push_state: push state returned by 'get_missing_items' call for |item|.
632
633 Returns:
634 Pushed item (same object as |item|).
635 """
636 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700637 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800638 self.async_push(channel, item, push_state)
639 pushed = channel.pull()
640 assert pushed is item
641 return item
642
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000643 def async_fetch(self, channel, priority, digest, size, sink):
644 """Starts asynchronous fetch from the server in a parallel thread.
645
646 Arguments:
647 channel: TaskChannel that receives back |digest| when download ends.
648 priority: thread pool task priority for the fetch.
649 digest: hex digest of an item to download.
650 size: expected size of the item (after decompression).
651 sink: function that will be called as sink(generator).
652 """
653 def fetch():
654 try:
655 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400656 stream = self._storage_api.fetch(digest, size, 0)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700657 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400658 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000659 # Run |stream| through verifier that will assert its size.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400660 verifier = FetchStreamVerifier(stream, self._hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000661 # Verified stream goes to |sink|.
662 sink(verifier.run())
663 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800664 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000665 raise
666 return digest
667
668 # Don't bother with zip_thread_pool for decompression. Decompression is
669 # really fast and most probably IO bound anyway.
670 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
671
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000672 def get_missing_items(self, items):
673 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000674
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000675 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000676
677 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000678 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000679
680 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800681 For each missing item it yields a pair (item, push_state), where:
682 * item - Item object that is missing (one of |items|).
683 * push_state - opaque object that contains storage specific information
684 describing how to upload the item (for example in case of cloud
685 storage, it is signed upload URLs). It can later be passed to
686 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000687 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000688 channel = threading_utils.TaskChannel()
689 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800690
691 # Ensure all digests are calculated.
692 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700693 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800694
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400695 def contains(batch):
696 if self._aborted:
697 raise Aborted()
698 return self._storage_api.contains(batch)
699
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000700 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800701 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400702 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400703 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000704 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800705
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000706 # Yield results as they come in.
707 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800708 for missing_item, push_state in channel.pull().iteritems():
709 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000710
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000711
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800712def batch_items_for_check(items):
713 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000714
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800715 Each batch corresponds to a single 'exists?' query to the server via a call
716 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000717
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800718 Arguments:
719 items: a list of Item objects.
720
721 Yields:
722 Batches of items to query for existence in a single operation,
723 each batch is a list of Item objects.
724 """
725 batch_count = 0
726 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
727 next_queries = []
728 for item in sorted(items, key=lambda x: x.size, reverse=True):
729 next_queries.append(item)
730 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000731 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800732 next_queries = []
733 batch_count += 1
734 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
735 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
736 if next_queries:
737 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000738
739
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000740class FetchQueue(object):
741 """Fetches items from Storage and places them into LocalCache.
742
743 It manages multiple concurrent fetch operations. Acts as a bridge between
744 Storage and LocalCache so that Storage and LocalCache don't depend on each
745 other at all.
746 """
747
748 def __init__(self, storage, cache):
749 self.storage = storage
750 self.cache = cache
751 self._channel = threading_utils.TaskChannel()
752 self._pending = set()
753 self._accessed = set()
754 self._fetched = cache.cached_set()
755
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400756 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700757 self,
758 digest,
759 size=UNKNOWN_FILE_SIZE,
760 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000761 """Starts asynchronous fetch of item |digest|."""
762 # Fetching it now?
763 if digest in self._pending:
764 return
765
766 # Mark this file as in use, verify_all_cached will later ensure it is still
767 # in cache.
768 self._accessed.add(digest)
769
770 # Already fetched? Notify cache to update item's LRU position.
771 if digest in self._fetched:
772 # 'touch' returns True if item is in cache and not corrupted.
773 if self.cache.touch(digest, size):
774 return
775 # Item is corrupted, remove it from cache and fetch it again.
776 self._fetched.remove(digest)
777 self.cache.evict(digest)
778
779 # TODO(maruel): It should look at the free disk space, the current cache
780 # size and the size of the new item on every new item:
781 # - Trim the cache as more entries are listed when free disk space is low,
782 # otherwise if the amount of data downloaded during the run > free disk
783 # space, it'll crash.
784 # - Make sure there's enough free disk space to fit all dependencies of
785 # this run! If not, abort early.
786
787 # Start fetching.
788 self._pending.add(digest)
789 self.storage.async_fetch(
790 self._channel, priority, digest, size,
791 functools.partial(self.cache.write, digest))
792
793 def wait(self, digests):
794 """Starts a loop that waits for at least one of |digests| to be retrieved.
795
796 Returns the first digest retrieved.
797 """
798 # Flush any already fetched items.
799 for digest in digests:
800 if digest in self._fetched:
801 return digest
802
803 # Ensure all requested items are being fetched now.
804 assert all(digest in self._pending for digest in digests), (
805 digests, self._pending)
806
807 # Wait for some requested item to finish fetching.
808 while self._pending:
809 digest = self._channel.pull()
810 self._pending.remove(digest)
811 self._fetched.add(digest)
812 if digest in digests:
813 return digest
814
815 # Should never reach this point due to assert above.
816 raise RuntimeError('Impossible state')
817
818 def inject_local_file(self, path, algo):
819 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700820 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000821 data = f.read()
822 digest = algo(data).hexdigest()
823 self.cache.write(digest, [data])
824 self._fetched.add(digest)
825 return digest
826
827 @property
828 def pending_count(self):
829 """Returns number of items to be fetched."""
830 return len(self._pending)
831
832 def verify_all_cached(self):
833 """True if all accessed items are in cache."""
834 return self._accessed.issubset(self.cache.cached_set())
835
836
837class FetchStreamVerifier(object):
838 """Verifies that fetched file is valid before passing it to the LocalCache."""
839
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400840 def __init__(self, stream, hasher, expected_digest, expected_size):
841 """Initializes the verifier.
842
843 Arguments:
844 * stream: an iterable yielding chunks of content
845 * hasher: an object from hashlib that supports update() and hexdigest()
846 (eg, hashlib.sha1).
847 * expected_digest: if the entire stream is piped through hasher and then
848 summarized via hexdigest(), this should be the result. That is, it
849 should be a hex string like 'abc123'.
850 * expected_size: either the expected size of the stream, or
851 UNKNOWN_FILE_SIZE.
852 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400853 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000854 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400855 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000856 self.expected_size = expected_size
857 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400858 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000859
860 def run(self):
861 """Generator that yields same items as |stream|.
862
863 Verifies |stream| is complete before yielding a last chunk to consumer.
864
865 Also wraps IOError produced by consumer into MappingError exceptions since
866 otherwise Storage will retry fetch on unrelated local cache errors.
867 """
868 # Read one chunk ahead, keep it in |stored|.
869 # That way a complete stream can be verified before pushing last chunk
870 # to consumer.
871 stored = None
872 for chunk in self.stream:
873 assert chunk is not None
874 if stored is not None:
875 self._inspect_chunk(stored, is_last=False)
876 try:
877 yield stored
878 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400879 raise isolated_format.MappingError(
880 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000881 stored = chunk
882 if stored is not None:
883 self._inspect_chunk(stored, is_last=True)
884 try:
885 yield stored
886 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400887 raise isolated_format.MappingError(
888 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000889
890 def _inspect_chunk(self, chunk, is_last):
891 """Called for each fetched chunk before passing it to consumer."""
892 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400893 self.rolling_hash.update(chunk)
894 if not is_last:
895 return
896
897 if ((self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000898 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400899 msg = 'Incorrect file size: want %d, got %d' % (
900 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400901 raise IOError(msg)
902
903 actual_digest = self.rolling_hash.hexdigest()
904 if self.expected_digest != actual_digest:
905 msg = 'Incorrect digest: want %s, got %s' % (
906 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400907 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000908
909
nodir445097b2016-06-03 22:50:26 -0700910class CacheMiss(Exception):
911 """Raised when an item is not in cache."""
912
913 def __init__(self, digest):
914 self.digest = digest
915 super(CacheMiss, self).__init__(
916 'Item with digest %r is not found in cache' % digest)
917
918
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000919class LocalCache(object):
920 """Local cache that stores objects fetched via Storage.
921
922 It can be accessed concurrently from multiple threads, so it should protect
923 its internal state with some lock.
924 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500925 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000926
maruel064c0a32016-04-05 11:47:15 -0700927 def __init__(self):
928 self._lock = threading_utils.LockWithAssert()
929 # Profiling values.
930 self._added = []
931 self._initial_number_items = 0
932 self._initial_size = 0
933 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700934 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700935
nodirbe642ff2016-06-09 15:51:51 -0700936 def __contains__(self, digest):
937 raise NotImplementedError()
938
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000939 def __enter__(self):
940 """Context manager interface."""
941 return self
942
943 def __exit__(self, _exc_type, _exec_value, _traceback):
944 """Context manager interface."""
945 return False
946
maruel064c0a32016-04-05 11:47:15 -0700947 @property
948 def added(self):
949 return self._added[:]
950
951 @property
952 def evicted(self):
953 return self._evicted[:]
954
955 @property
tansell9e04a8d2016-07-28 09:31:59 -0700956 def used(self):
957 return self._used[:]
958
959 @property
maruel064c0a32016-04-05 11:47:15 -0700960 def initial_number_items(self):
961 return self._initial_number_items
962
963 @property
964 def initial_size(self):
965 return self._initial_size
966
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000967 def cached_set(self):
968 """Returns a set of all cached digests (always a new object)."""
969 raise NotImplementedError()
970
maruel36a963d2016-04-08 17:15:49 -0700971 def cleanup(self):
972 """Deletes any corrupted item from the cache and trims it if necessary."""
973 raise NotImplementedError()
974
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000975 def touch(self, digest, size):
976 """Ensures item is not corrupted and updates its LRU position.
977
978 Arguments:
979 digest: hash digest of item to check.
980 size: expected size of this item.
981
982 Returns:
983 True if item is in cache and not corrupted.
984 """
985 raise NotImplementedError()
986
987 def evict(self, digest):
988 """Removes item from cache if it's there."""
989 raise NotImplementedError()
990
tansell9e04a8d2016-07-28 09:31:59 -0700991 def getfileobj(self, digest):
992 """Returns a readable file like object.
993
994 If file exists on the file system it will have a .name attribute with an
995 absolute path to the file.
996 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000997 raise NotImplementedError()
998
999 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001000 """Reads data from |content| generator and stores it in cache.
1001
1002 Returns digest to simplify chaining.
1003 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001004 raise NotImplementedError()
1005
maruele6fc9382017-05-04 09:03:48 -07001006 def trim(self):
1007 """Enforces cache policies.
1008
1009 Returns:
1010 Number of items evicted.
1011 """
1012 raise NotImplementedError()
1013
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001014
1015class MemoryCache(LocalCache):
1016 """LocalCache implementation that stores everything in memory."""
1017
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001018 def __init__(self, file_mode_mask=0500):
1019 """Args:
1020 file_mode_mask: bit mask to AND file mode with. Default value will make
1021 all mapped files to be read only.
1022 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001023 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001024 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001025 self._contents = {}
1026
nodirbe642ff2016-06-09 15:51:51 -07001027 def __contains__(self, digest):
1028 with self._lock:
1029 return digest in self._contents
1030
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001031 def cached_set(self):
1032 with self._lock:
1033 return set(self._contents)
1034
maruel36a963d2016-04-08 17:15:49 -07001035 def cleanup(self):
1036 pass
1037
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001038 def touch(self, digest, size):
1039 with self._lock:
1040 return digest in self._contents
1041
1042 def evict(self, digest):
1043 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001044 v = self._contents.pop(digest, None)
1045 if v is not None:
1046 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001047
tansell9e04a8d2016-07-28 09:31:59 -07001048 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001049 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001050 try:
tansell9e04a8d2016-07-28 09:31:59 -07001051 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001052 except KeyError:
1053 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001054 self._used.append(len(d))
1055 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001056
1057 def write(self, digest, content):
1058 # Assemble whole stream before taking the lock.
1059 data = ''.join(content)
1060 with self._lock:
1061 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001062 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001063 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001064
maruele6fc9382017-05-04 09:03:48 -07001065 def trim(self):
1066 """Trimming is not implemented for MemoryCache."""
1067 return 0
1068
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001069
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001070class CachePolicies(object):
1071 def __init__(self, max_cache_size, min_free_space, max_items):
1072 """
1073 Arguments:
1074 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1075 cache is effectively a leak.
1076 - min_free_space: Trim if disk free space becomes lower than this value. If
1077 0, it unconditionally fill the disk.
1078 - max_items: Maximum number of items to keep in the cache. If 0, do not
1079 enforce a limit.
1080 """
1081 self.max_cache_size = max_cache_size
1082 self.min_free_space = min_free_space
1083 self.max_items = max_items
1084
1085
1086class DiskCache(LocalCache):
1087 """Stateful LRU cache in a flat hash table in a directory.
1088
1089 Saves its state as json file.
1090 """
maruel12e30012015-10-09 11:55:35 -07001091 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001092
maruele6fc9382017-05-04 09:03:48 -07001093 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001094 """
1095 Arguments:
1096 cache_dir: directory where to place the cache.
1097 policies: cache retention policies.
1098 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001099 trim: if True to enforce |policies| right away.
1100 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001101 """
maruel064c0a32016-04-05 11:47:15 -07001102 # All protected methods (starting with '_') except _path should be called
1103 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001104 super(DiskCache, self).__init__()
1105 self.cache_dir = cache_dir
1106 self.policies = policies
1107 self.hash_algo = hash_algo
1108 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001109 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001110 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001111 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001112 file_path.ensure_tree(self.cache_dir)
1113 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001114 # The first item in the LRU cache that must not be evicted during this run
1115 # since it was referenced. All items more recent that _protected in the LRU
1116 # cache are also inherently protected. It could be a set() of all items
1117 # referenced but this increases memory usage without a use case.
1118 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001119 # Cleanup operations done by self._load(), if any.
1120 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001121 with tools.Profiler('Setup'):
1122 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001123 self._load(trim, time_fn)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001124
nodirbe642ff2016-06-09 15:51:51 -07001125 def __contains__(self, digest):
1126 with self._lock:
1127 return digest in self._lru
1128
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001129 def __enter__(self):
1130 return self
1131
1132 def __exit__(self, _exc_type, _exec_value, _traceback):
1133 with tools.Profiler('CleanupTrimming'):
1134 with self._lock:
1135 self._trim()
1136
1137 logging.info(
1138 '%5d (%8dkb) added',
1139 len(self._added), sum(self._added) / 1024)
1140 logging.info(
1141 '%5d (%8dkb) current',
1142 len(self._lru),
1143 sum(self._lru.itervalues()) / 1024)
1144 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001145 '%5d (%8dkb) evicted',
1146 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001147 logging.info(
1148 ' %8dkb free',
1149 self._free_disk / 1024)
1150 return False
1151
1152 def cached_set(self):
1153 with self._lock:
1154 return self._lru.keys_set()
1155
maruel36a963d2016-04-08 17:15:49 -07001156 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001157 """Cleans up the cache directory.
1158
1159 Ensures there is no unknown files in cache_dir.
1160 Ensures the read-only bits are set correctly.
1161
1162 At that point, the cache was already loaded, trimmed to respect cache
1163 policies.
1164 """
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001165 with self._lock:
1166 fs.chmod(self.cache_dir, 0700)
1167 # Ensure that all files listed in the state still exist and add new ones.
1168 previous = self._lru.keys_set()
1169 # It'd be faster if there were a readdir() function.
1170 for filename in fs.listdir(self.cache_dir):
1171 if filename == self.STATE_FILE:
1172 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1173 continue
1174 if filename in previous:
1175 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1176 previous.remove(filename)
1177 continue
1178
1179 # An untracked file. Delete it.
1180 logging.warning('Removing unknown file %s from cache', filename)
1181 p = self._path(filename)
1182 if fs.isdir(p):
1183 try:
1184 file_path.rmtree(p)
1185 except OSError:
1186 pass
1187 else:
1188 file_path.try_remove(p)
maruel2e8d0f52016-07-16 07:51:29 -07001189 continue
1190
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001191 if previous:
1192 # Filter out entries that were not found.
1193 logging.warning('Removed %d lost files', len(previous))
1194 for filename in previous:
1195 self._lru.pop(filename)
1196 self._save()
maruel36a963d2016-04-08 17:15:49 -07001197
1198 # What remains to be done is to hash every single item to
1199 # detect corruption, then save to ensure state.json is up to date.
1200 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1201 # TODO(maruel): Let's revisit once directory metadata is stored in
1202 # state.json so only the files that had been mapped since the last cleanup()
1203 # call are manually verified.
1204 #
1205 #with self._lock:
1206 # for digest in self._lru:
1207 # if not isolated_format.is_valid_hash(
1208 # self._path(digest), self.hash_algo):
1209 # self.evict(digest)
1210 # logging.info('Deleted corrupted item: %s', digest)
1211
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001212 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001213 """Verifies an actual file is valid and bumps its LRU position.
1214
1215 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1216 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001217
1218 Note that is doesn't compute the hash so it could still be corrupted if the
1219 file size didn't change.
1220
1221 TODO(maruel): More stringent verification while keeping the check fast.
1222 """
1223 # Do the check outside the lock.
1224 if not is_valid_file(self._path(digest), size):
1225 return False
1226
1227 # Update it's LRU position.
1228 with self._lock:
1229 if digest not in self._lru:
1230 return False
1231 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001232 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001233 return True
1234
1235 def evict(self, digest):
1236 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001237 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001238 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001239 self._lru.pop(digest)
1240 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1241
tansell9e04a8d2016-07-28 09:31:59 -07001242 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001243 try:
tansell9e04a8d2016-07-28 09:31:59 -07001244 f = fs.open(self._path(digest), 'rb')
1245 with self._lock:
1246 self._used.append(self._lru[digest])
1247 return f
nodir445097b2016-06-03 22:50:26 -07001248 except IOError:
1249 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001250
1251 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001252 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001253 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001254 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001255 path = self._path(digest)
1256 # A stale broken file may remain. It is possible for the file to have write
1257 # access bit removed which would cause the file_write() call to fail to open
1258 # in write mode. Take no chance here.
1259 file_path.try_remove(path)
1260 try:
1261 size = file_write(path, content)
1262 except:
1263 # There are two possible places were an exception can occur:
1264 # 1) Inside |content| generator in case of network or unzipping errors.
1265 # 2) Inside file_write itself in case of disk IO errors.
1266 # In any case delete an incomplete file and propagate the exception to
1267 # caller, it will be logged there.
1268 file_path.try_remove(path)
1269 raise
1270 # Make the file read-only in the cache. This has a few side-effects since
1271 # the file node is modified, so every directory entries to this file becomes
1272 # read-only. It's fine here because it is a new file.
1273 file_path.set_read_only(path, True)
1274 with self._lock:
1275 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001276 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001277
nodirf33b8d62016-10-26 22:34:58 -07001278 def get_oldest(self):
1279 """Returns digest of the LRU item or None."""
1280 try:
1281 return self._lru.get_oldest()[0]
1282 except KeyError:
1283 return None
1284
1285 def get_timestamp(self, digest):
1286 """Returns timestamp of last use of an item.
1287
1288 Raises KeyError if item is not found.
1289 """
1290 return self._lru.get_timestamp(digest)
1291
1292 def trim(self):
1293 """Forces retention policies."""
1294 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001295 return self._trim()
nodirf33b8d62016-10-26 22:34:58 -07001296
maruele6fc9382017-05-04 09:03:48 -07001297 def _load(self, trim, time_fn):
maruel2e8d0f52016-07-16 07:51:29 -07001298 """Loads state of the cache from json file.
1299
1300 If cache_dir does not exist on disk, it is created.
1301 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001302 self._lock.assert_locked()
1303
maruel2e8d0f52016-07-16 07:51:29 -07001304 if not fs.isfile(self.state_file):
1305 if not os.path.isdir(self.cache_dir):
1306 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001307 else:
maruel2e8d0f52016-07-16 07:51:29 -07001308 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001309 try:
1310 self._lru = lru.LRUDict.load(self.state_file)
1311 except ValueError as err:
1312 logging.error('Failed to load cache state: %s' % (err,))
1313 # Don't want to keep broken state file.
1314 file_path.try_remove(self.state_file)
maruele6fc9382017-05-04 09:03:48 -07001315 if time_fn:
1316 self._lru.time_fn = time_fn
nodirf33b8d62016-10-26 22:34:58 -07001317 if trim:
1318 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001319 # We want the initial cache size after trimming, i.e. what is readily
1320 # avaiable.
1321 self._initial_number_items = len(self._lru)
1322 self._initial_size = sum(self._lru.itervalues())
1323 if self._evicted:
1324 logging.info(
1325 'Trimming evicted items with the following sizes: %s',
1326 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001327
1328 def _save(self):
1329 """Saves the LRU ordering."""
1330 self._lock.assert_locked()
1331 if sys.platform != 'win32':
1332 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001333 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001334 # Necessary otherwise the file can't be created.
1335 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001336 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001337 file_path.set_read_only(self.state_file, False)
1338 self._lru.save(self.state_file)
1339
1340 def _trim(self):
1341 """Trims anything we don't know, make sure enough free space exists."""
1342 self._lock.assert_locked()
1343
1344 # Ensure maximum cache size.
1345 if self.policies.max_cache_size:
1346 total_size = sum(self._lru.itervalues())
1347 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001348 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001349
1350 # Ensure maximum number of items in the cache.
1351 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1352 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001353 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001354
1355 # Ensure enough free space.
1356 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001357 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001358 while (
1359 self.policies.min_free_space and
1360 self._lru and
1361 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001362 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001363 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001364
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001365 if trimmed_due_to_space:
1366 total_usage = sum(self._lru.itervalues())
1367 usage_percent = 0.
1368 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001369 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1370
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001371 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001372 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1373 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1374 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001375 self._free_disk / 1024.,
1376 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001377 usage_percent,
1378 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001379 self._save()
maruele6fc9382017-05-04 09:03:48 -07001380 return trimmed_due_to_space
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001381
1382 def _path(self, digest):
1383 """Returns the path to one item."""
1384 return os.path.join(self.cache_dir, digest)
1385
maruel2e8d0f52016-07-16 07:51:29 -07001386 def _remove_lru_file(self, allow_protected):
1387 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001388 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001389 try:
nodireabc11c2016-10-18 16:37:28 -07001390 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001391 if not allow_protected and digest == self._protected:
maruele6fc9382017-05-04 09:03:48 -07001392 raise Error(
1393 'Not enough space to fetch the whole isolated tree; %sb free, min '
1394 'is %sb' % (self._free_disk, self.policies.min_free_space))
maruel083fa552016-04-08 14:38:01 -07001395 except KeyError:
1396 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001397 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001398 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001399 self._delete_file(digest, size)
1400 return size
1401
1402 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1403 """Adds an item into LRU cache marking it as a newest one."""
1404 self._lock.assert_locked()
1405 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001406 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001407 self._added.append(size)
1408 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001409 self._free_disk -= size
1410 # Do a quicker version of self._trim(). It only enforces free disk space,
1411 # not cache size limits. It doesn't actually look at real free disk space,
1412 # only uses its cache values. self._trim() will be called later to enforce
1413 # real trimming but doing this quick version here makes it possible to map
1414 # an isolated that is larger than the current amount of free disk space when
1415 # the cache size is already large.
1416 while (
1417 self.policies.min_free_space and
1418 self._lru and
1419 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001420 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001421
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001422 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1423 """Deletes cache file from the file system."""
1424 self._lock.assert_locked()
1425 try:
1426 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001427 try:
1428 size = fs.stat(self._path(digest)).st_size
1429 except OSError:
1430 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001431 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001432 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001433 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001434 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001435 if e.errno != errno.ENOENT:
1436 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001437
1438
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001439class IsolatedBundle(object):
1440 """Fetched and parsed .isolated file with all dependencies."""
1441
Vadim Shtayura3148e072014-09-02 18:51:52 -07001442 def __init__(self):
1443 self.command = []
1444 self.files = {}
1445 self.read_only = None
1446 self.relative_cwd = None
1447 # The main .isolated file, a IsolatedFile instance.
1448 self.root = None
1449
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001450 def fetch(self, fetch_queue, root_isolated_hash, algo):
1451 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001452
1453 It enables support for "included" .isolated files. They are processed in
1454 strict order but fetched asynchronously from the cache. This is important so
1455 that a file in an included .isolated file that is overridden by an embedding
1456 .isolated file is not fetched needlessly. The includes are fetched in one
1457 pass and the files are fetched as soon as all the ones on the left-side
1458 of the tree were fetched.
1459
1460 The prioritization is very important here for nested .isolated files.
1461 'includes' have the highest priority and the algorithm is optimized for both
1462 deep and wide trees. A deep one is a long link of .isolated files referenced
1463 one at a time by one item in 'includes'. A wide one has a large number of
1464 'includes' in a single .isolated file. 'left' is defined as an included
1465 .isolated file earlier in the 'includes' list. So the order of the elements
1466 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001467
1468 As a side effect this method starts asynchronous fetch of all data files
1469 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1470 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001471 """
1472 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1473
1474 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1475 pending = {}
1476 # Set of hashes of already retrieved items to refuse recursive includes.
1477 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001478 # Set of IsolatedFile's whose data files have already being fetched.
1479 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001480
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001481 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001482 h = isolated_file.obj_hash
1483 if h in seen:
1484 raise isolated_format.IsolatedError(
1485 'IsolatedFile %s is retrieved recursively' % h)
1486 assert h not in pending
1487 seen.add(h)
1488 pending[h] = isolated_file
1489 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1490
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001491 # Start fetching root *.isolated file (single file, not the whole bundle).
1492 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001493
1494 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001495 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001496 item_hash = fetch_queue.wait(pending)
1497 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001498 with fetch_queue.cache.getfileobj(item_hash) as f:
1499 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001500
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001501 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001502 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001503 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001504
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001505 # Always fetch *.isolated files in traversal order, waiting if necessary
1506 # until next to-be-processed node loads. "Waiting" is done by yielding
1507 # back to the outer loop, that waits until some *.isolated is loaded.
1508 for node in isolated_format.walk_includes(self.root):
1509 if node not in processed:
1510 # Not visited, and not yet loaded -> wait for it to load.
1511 if not node.is_loaded:
1512 break
1513 # Not visited and loaded -> process it and continue the traversal.
1514 self._start_fetching_files(node, fetch_queue)
1515 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001516
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001517 # All *.isolated files should be processed by now and only them.
1518 all_isolateds = set(isolated_format.walk_includes(self.root))
1519 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001520
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001521 # Extract 'command' and other bundle properties.
1522 for node in isolated_format.walk_includes(self.root):
1523 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001524 self.relative_cwd = self.relative_cwd or ''
1525
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001526 def _start_fetching_files(self, isolated, fetch_queue):
1527 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001528
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001529 Modifies self.files.
1530 """
maruel10bea7b2016-12-07 05:03:49 -08001531 files = isolated.data.get('files', {})
1532 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1533 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001534 # Root isolated has priority on the files being mapped. In particular,
1535 # overridden files must not be fetched.
1536 if filepath not in self.files:
1537 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001538
1539 # Make sure if the isolated is read only, the mode doesn't have write
1540 # bits.
1541 if 'm' in properties and self.read_only:
1542 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1543
1544 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001545 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001546 fetch_queue.add(
1547 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1548
1549 def _update_self(self, node):
1550 """Extracts bundle global parameters from loaded *.isolated file.
1551
1552 Will be called with each loaded *.isolated file in order of traversal of
1553 isolated include graph (see isolated_format.walk_includes).
1554 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001555 # Grabs properties.
1556 if not self.command and node.data.get('command'):
1557 # Ensure paths are correctly separated on windows.
1558 self.command = node.data['command']
1559 if self.command:
1560 self.command[0] = self.command[0].replace('/', os.path.sep)
1561 self.command = tools.fix_python_path(self.command)
1562 if self.read_only is None and node.data.get('read_only') is not None:
1563 self.read_only = node.data['read_only']
1564 if (self.relative_cwd is None and
1565 node.data.get('relative_cwd') is not None):
1566 self.relative_cwd = node.data['relative_cwd']
1567
1568
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001569def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001570 """Returns Storage class that can upload and download from |namespace|.
1571
1572 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001573 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001574 namespace: isolate namespace to operate in, also defines hashing and
1575 compression scheme used, i.e. namespace names that end with '-gzip'
1576 store compressed data.
1577
1578 Returns:
1579 Instance of Storage.
1580 """
aludwin81178302016-11-30 17:18:49 -08001581 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001582
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001583
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001584def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001585 """Uploads the given tree to the given url.
1586
1587 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001588 base_url: The url of the isolate server to upload to.
1589 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001590 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001591 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001592 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001593 # Filter out symlinks, since they are not represented by items on isolate
1594 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001595 items = []
1596 seen = set()
1597 skipped = 0
1598 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001599 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001600 if 'l' not in metadata and filepath not in seen:
1601 seen.add(filepath)
1602 item = FileItem(
1603 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001604 digest=metadata['h'],
1605 size=metadata['s'],
1606 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001607 items.append(item)
1608 else:
1609 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001610
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001611 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001612 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001613 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001614
1615
maruel4409e302016-07-19 14:25:51 -07001616def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001617 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001618
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001619 Arguments:
1620 isolated_hash: hash of the root *.isolated file.
1621 storage: Storage class that communicates with isolate storage.
1622 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001623 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001624 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001625
1626 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001627 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001628 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001629 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001630 'fetch_isolated(%s, %s, %s, %s, %s)',
1631 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001632 # Hash algorithm to use, defined by namespace |storage| is using.
1633 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001634 with cache:
1635 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001636 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001637
1638 with tools.Profiler('GetIsolateds'):
1639 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001640 if not isolated_format.is_valid_hash(isolated_hash, algo):
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001641 logging.debug('%s is not a valid hash, assuming a file '
1642 '(algo was %s, hash size was %d)',
1643 isolated_hash, algo(), algo().digest_size)
maruel1ceb3872015-10-14 06:10:44 -07001644 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001645 try:
maruel1ceb3872015-10-14 06:10:44 -07001646 isolated_hash = fetch_queue.inject_local_file(path, algo)
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001647 except IOError as e:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001648 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001649 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001650 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001651
1652 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001653 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001654
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001655 with tools.Profiler('GetRest'):
1656 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001657 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001658 create_directories(outdir, bundle.files)
1659 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001660
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001661 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001662 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001663 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001664
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001665 # Multimap: digest -> list of pairs (path, props).
1666 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001667 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001668 if 'h' in props:
1669 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001670
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001671 # Now block on the remaining files to be downloaded and mapped.
1672 logging.info('Retrieving remaining files (%d of them)...',
1673 fetch_queue.pending_count)
1674 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001675 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001676 while remaining:
1677 detector.ping()
1678
1679 # Wait for any item to finish fetching to cache.
1680 digest = fetch_queue.wait(remaining)
1681
tansell9e04a8d2016-07-28 09:31:59 -07001682 # Create the files in the destination using item in cache as the
1683 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001684 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001685 fullpath = os.path.join(outdir, filepath)
1686
1687 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001688 filetype = props.get('t', 'basic')
1689
1690 if filetype == 'basic':
1691 file_mode = props.get('m')
1692 if file_mode:
1693 # Ignore all bits apart from the user
1694 file_mode &= 0700
1695 putfile(
1696 srcfileobj, fullpath, file_mode,
1697 use_symlink=use_symlinks)
1698
tansell26de79e2016-11-13 18:41:11 -08001699 elif filetype == 'tar':
1700 basedir = os.path.dirname(fullpath)
1701 with tarfile.TarFile(fileobj=srcfileobj) as extractor:
1702 for ti in extractor:
1703 if not ti.isfile():
1704 logging.warning(
1705 'Path(%r) is nonfile (%s), skipped',
1706 ti.name, ti.type)
1707 continue
1708 fp = os.path.normpath(os.path.join(basedir, ti.name))
1709 if not fp.startswith(basedir):
1710 logging.error(
1711 'Path(%r) is outside root directory',
1712 fp)
1713 ifd = extractor.extractfile(ti)
1714 file_path.ensure_tree(os.path.dirname(fp))
1715 putfile(ifd, fp, 0700, ti.size)
1716
tanselle4288c32016-07-28 09:45:40 -07001717 else:
1718 raise isolated_format.IsolatedError(
1719 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001720
1721 # Report progress.
1722 duration = time.time() - last_update
1723 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1724 msg = '%d files remaining...' % len(remaining)
1725 print msg
1726 logging.info(msg)
1727 last_update = time.time()
1728
1729 # Cache could evict some items we just tried to fetch, it's a fatal error.
1730 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001731 raise isolated_format.MappingError(
1732 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001733 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001734
1735
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001736def directory_to_metadata(root, algo, blacklist):
1737 """Returns the FileItem list and .isolated metadata for a directory."""
1738 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001739 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001740 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001741 metadata = {
1742 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001743 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001744 for relpath in paths
1745 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001746 for v in metadata.itervalues():
1747 v.pop('t')
1748 items = [
1749 FileItem(
1750 path=os.path.join(root, relpath),
1751 digest=meta['h'],
1752 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001753 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001754 for relpath, meta in metadata.iteritems() if 'h' in meta
1755 ]
1756 return items, metadata
1757
1758
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001759def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001760 """Stores every entries and returns the relevant data.
1761
1762 Arguments:
1763 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001764 files: list of file paths to upload. If a directory is specified, a
1765 .isolated file is created and its hash is returned.
1766 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001767
1768 Returns:
1769 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1770 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001771 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001772 assert all(isinstance(i, unicode) for i in files), files
1773 if len(files) != len(set(map(os.path.abspath, files))):
1774 raise Error('Duplicate entries found.')
1775
maruel064c0a32016-04-05 11:47:15 -07001776 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001777 results = []
1778 # The temporary directory is only created as needed.
1779 tempdir = None
1780 try:
1781 # TODO(maruel): Yield the files to a worker thread.
1782 items_to_upload = []
1783 for f in files:
1784 try:
1785 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001786 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001787 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001788 items, metadata = directory_to_metadata(
1789 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001790
1791 # Create the .isolated file.
1792 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001793 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1794 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001795 os.close(handle)
1796 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001797 'algo':
1798 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001799 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001800 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001801 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001802 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001803 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001804 items_to_upload.extend(items)
1805 items_to_upload.append(
1806 FileItem(
1807 path=isolated,
1808 digest=h,
maruel12e30012015-10-09 11:55:35 -07001809 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001810 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001811 results.append((h, f))
1812
maruel12e30012015-10-09 11:55:35 -07001813 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001814 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001815 items_to_upload.append(
1816 FileItem(
1817 path=filepath,
1818 digest=h,
maruel12e30012015-10-09 11:55:35 -07001819 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001820 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001821 results.append((h, f))
1822 else:
1823 raise Error('%s is neither a file or directory.' % f)
1824 except OSError:
1825 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001826 uploaded = storage.upload_items(items_to_upload)
1827 cold = [i for i in items_to_upload if i in uploaded]
1828 hot = [i for i in items_to_upload if i not in uploaded]
1829 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001830 finally:
maruel12e30012015-10-09 11:55:35 -07001831 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001832 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001833
1834
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001835def archive(out, namespace, files, blacklist):
1836 if files == ['-']:
1837 files = sys.stdin.readlines()
1838
1839 if not files:
1840 raise Error('Nothing to upload')
1841
1842 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001843 blacklist = tools.gen_blacklist(blacklist)
1844 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001845 # Ignore stats.
1846 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001847 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1848
1849
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001850@subcommand.usage('<file1..fileN> or - to read from stdin')
1851def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001852 """Archives data to the server.
1853
1854 If a directory is specified, a .isolated file is created the whole directory
1855 is uploaded. Then this .isolated file can be included in another one to run
1856 commands.
1857
1858 The commands output each file that was processed with its content hash. For
1859 directories, the .isolated generated for the directory is listed as the
1860 directory entry itself.
1861 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001862 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001863 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001864 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001865 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001866 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001867 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001868 except Error as e:
1869 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001870 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001871
1872
1873def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001874 """Download data from the server.
1875
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001876 It can either download individual files or a complete tree from a .isolated
1877 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001878 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001879 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001880 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001881 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001882 help='hash of an isolated file, .isolated file content is discarded, use '
1883 '--file if you need it')
1884 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001885 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1886 help='hash and destination of a file, can be used multiple times')
1887 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001888 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001889 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001890 parser.add_option(
1891 '--use-symlinks', action='store_true',
1892 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001893 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001894 options, args = parser.parse_args(args)
1895 if args:
1896 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001897 if not file_path.enable_symlink():
1898 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001899
nodir55be77b2016-05-03 09:39:57 -07001900 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001901 if bool(options.isolated) == bool(options.file):
1902 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001903 if not options.cache and options.use_symlinks:
1904 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001905
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001906 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001907 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001908 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001909 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001910 if (fs.isfile(options.target) or
1911 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001912 parser.error(
1913 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001914 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001915 # Fetching individual files.
1916 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001917 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001918 channel = threading_utils.TaskChannel()
1919 pending = {}
1920 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001921 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001922 pending[digest] = dest
1923 storage.async_fetch(
1924 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001925 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001926 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001927 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001928 functools.partial(file_write, os.path.join(options.target, dest)))
1929 while pending:
1930 fetched = channel.pull()
1931 dest = pending.pop(fetched)
1932 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001933
Vadim Shtayura3172be52013-12-03 12:49:05 -08001934 # Fetching whole isolated tree.
1935 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001936 with cache:
1937 bundle = fetch_isolated(
1938 isolated_hash=options.isolated,
1939 storage=storage,
1940 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001941 outdir=options.target,
1942 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001943 if bundle.command:
1944 rel = os.path.join(options.target, bundle.relative_cwd)
1945 print('To run this test please run from the directory %s:' %
1946 os.path.join(options.target, rel))
1947 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001948
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001949 return 0
1950
1951
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001952def add_archive_options(parser):
1953 parser.add_option(
1954 '--blacklist',
1955 action='append', default=list(DEFAULT_BLACKLIST),
1956 help='List of regexp to use as blacklist filter when uploading '
1957 'directories')
1958
1959
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001960def add_isolate_server_options(parser):
1961 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001962 parser.add_option(
1963 '-I', '--isolate-server',
1964 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001965 help='URL of the Isolate Server to use. Defaults to the environment '
1966 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1967 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001968 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001969 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001970 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001971 '--namespace', default='default-gzip',
1972 help='The namespace to use on the Isolate Server, default: %default')
1973
1974
nodir55be77b2016-05-03 09:39:57 -07001975def process_isolate_server_options(
1976 parser, options, set_exception_handler, required):
1977 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001978
1979 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001980 """
1981 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001982 if required:
1983 parser.error('--isolate-server is required.')
1984 return
1985
aludwind7b7b7e2017-06-29 16:38:50 -07001986 if options.grpc_proxy:
1987 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001988 else:
1989 try:
1990 options.isolate_server = net.fix_url(options.isolate_server)
1991 except ValueError as e:
1992 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001993 if set_exception_handler:
1994 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001995 try:
1996 return auth.ensure_logged_in(options.isolate_server)
1997 except ValueError as e:
1998 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001999
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002000
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002001def add_cache_options(parser):
2002 cache_group = optparse.OptionGroup(parser, 'Cache management')
2003 cache_group.add_option(
2004 '--cache', metavar='DIR',
2005 help='Directory to keep a local cache of the files. Accelerates download '
2006 'by reusing already downloaded files. Default=%default')
2007 cache_group.add_option(
2008 '--max-cache-size',
2009 type='int',
2010 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002011 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002012 help='Trim if the cache gets larger than this value, default=%default')
2013 cache_group.add_option(
2014 '--min-free-space',
2015 type='int',
2016 metavar='NNN',
2017 default=2*1024*1024*1024,
2018 help='Trim if disk free space becomes lower than this value, '
2019 'default=%default')
2020 cache_group.add_option(
2021 '--max-items',
2022 type='int',
2023 metavar='NNN',
2024 default=100000,
2025 help='Trim if more than this number of items are in the cache '
2026 'default=%default')
2027 parser.add_option_group(cache_group)
2028
2029
maruele6fc9382017-05-04 09:03:48 -07002030def process_cache_options(options, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002031 if options.cache:
2032 policies = CachePolicies(
2033 options.max_cache_size, options.min_free_space, options.max_items)
2034
2035 # |options.cache| path may not exist until DiskCache() instance is created.
2036 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002037 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002038 policies,
nodirf33b8d62016-10-26 22:34:58 -07002039 isolated_format.get_hash_algo(options.namespace),
maruele6fc9382017-05-04 09:03:48 -07002040 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002041 else:
2042 return MemoryCache()
2043
2044
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002045class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002046 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002047 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002048 self,
2049 version=__version__,
2050 prog=os.path.basename(sys.modules[__name__].__file__),
2051 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002052 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002053
2054 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002055 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002056 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002057 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002058 return options, args
2059
2060
2061def main(args):
2062 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002063 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002064
2065
2066if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002067 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002068 fix_encoding.fix_encoding()
2069 tools.disable_buffering()
2070 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002071 sys.exit(main(sys.argv[1:]))