blob: 89197352ff642ffe5791e351e11340b15df43fd3 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04008__version__ = '0.8.1'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
maruel12e30012015-10-09 11:55:35 -070030from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040031from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040032from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000033from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040034from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070035from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
aludwin81178302016-11-30 17:18:49 -080041import isolate_storage
42from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080043
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000044
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045# Version of isolate protocol passed to the server in /handshake request.
46ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048
Vadim Shtayura3148e072014-09-02 18:51:52 -070049# The file size to be used when we don't know the correct file size,
50# generally used for .isolated files.
51UNKNOWN_FILE_SIZE = None
52
53
54# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050088DEFAULT_BLACKLIST = (
89 # Temporary vim or python files.
90 r'^.+\.(?:pyc|swp)$',
91 # .git or .svn directory.
92 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
93)
94
95
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050096class Error(Exception):
97 """Generic runtime error."""
98 pass
99
100
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400101class Aborted(Error):
102 """Operation aborted."""
103 pass
104
105
nodir90bc8dc2016-06-15 13:35:21 -0700106class AlreadyExists(Error):
107 """File already exists."""
108
109
maruel12e30012015-10-09 11:55:35 -0700110def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700112 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800113 if offset:
114 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000116 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 if not data:
118 break
119 yield data
120
121
maruel12e30012015-10-09 11:55:35 -0700122def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000123 """Writes file content as generated by content_generator.
124
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 Creates the intermediary directory as needed.
126
127 Returns the number of bytes written.
128
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000129 Meant to be mocked out in unit tests.
130 """
nodire5028a92016-04-29 14:38:21 -0700131 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 total = 0
maruel12e30012015-10-09 11:55:35 -0700133 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000134 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000135 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000137 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138
139
tansell9e04a8d2016-07-28 09:31:59 -0700140def fileobj_path(fileobj):
141 """Return file system path for file like object or None.
142
143 The returned path is guaranteed to exist and can be passed to file system
144 operations like copy.
145 """
146 name = getattr(fileobj, 'name', None)
147 if name is None:
148 return
149
150 # If the file like object was created using something like open("test.txt")
151 # name will end up being a str (such as a function outside our control, like
152 # the standard library). We want all our paths to be unicode objects, so we
153 # decode it.
154 if not isinstance(name, unicode):
155 name = name.decode(sys.getfilesystemencoding())
156
tansell26de79e2016-11-13 18:41:11 -0800157 # fs.exists requires an absolute path, otherwise it will fail with an
158 # assertion error.
159 if not os.path.isabs(name):
160 return
161
tansell9e04a8d2016-07-28 09:31:59 -0700162 if fs.exists(name):
163 return name
164
165
166# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
167# wrappers have been created.
168def fileobj_copy(
169 dstfileobj, srcfileobj, size=-1,
170 chunk_size=isolated_format.DISK_FILE_CHUNK):
171 """Copy data from srcfileobj to dstfileobj.
172
173 Providing size means exactly that amount of data will be copied (if there
174 isn't enough data, an IOError exception is thrown). Otherwise all data until
175 the EOF marker will be copied.
176 """
177 if size == -1 and hasattr(srcfileobj, 'tell'):
178 if srcfileobj.tell() != 0:
179 raise IOError('partial file but not using size')
180
181 written = 0
182 while written != size:
183 readsize = chunk_size
184 if size > 0:
185 readsize = min(readsize, size-written)
186 data = srcfileobj.read(readsize)
187 if not data:
188 if size == -1:
189 break
190 raise IOError('partial file, got %s, wanted %s' % (written, size))
191 dstfileobj.write(data)
192 written += len(data)
193
194
195def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
196 """Put srcfileobj at the given dstpath with given mode.
197
198 The function aims to do this as efficiently as possible while still allowing
199 any possible file like object be given.
200
201 Creating a tree of hardlinks has a few drawbacks:
202 - tmpfs cannot be used for the scratch space. The tree has to be on the same
203 partition as the cache.
204 - involves a write to the inode, which advances ctime, cause a metadata
205 writeback (causing disk seeking).
206 - cache ctime cannot be used to detect modifications / corruption.
207 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
208 partition. This is why the function automatically fallbacks to copying the
209 file content.
210 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
211 same owner is for all hardlinks.
212 - Anecdotal report that ext2 is known to be potentially faulty on high rate
213 of hardlink creation.
214
215 Creating a tree of symlinks has a few drawbacks:
216 - Tasks running the equivalent of os.path.realpath() will get the naked path
217 and may fail.
218 - Windows:
219 - Symlinks are reparse points:
220 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
221 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
222 - Symbolic links are Win32 paths, not NT paths.
223 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
224 - Symbolic links are supported on Windows 7 and later only.
225 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
226 default.
227 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
228 RID is present in the token;
229 https://msdn.microsoft.com/en-us/library/bb530410.aspx
230 """
231 srcpath = fileobj_path(srcfileobj)
232 if srcpath and size == -1:
233 readonly = file_mode is None or (
234 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
235
236 if readonly:
237 # If the file is read only we can link the file
238 if use_symlink:
239 link_mode = file_path.SYMLINK_WITH_FALLBACK
240 else:
241 link_mode = file_path.HARDLINK_WITH_FALLBACK
242 else:
243 # If not read only, we must copy the file
244 link_mode = file_path.COPY
245
246 file_path.link_file(dstpath, srcpath, link_mode)
247 else:
248 # Need to write out the file
249 with fs.open(dstpath, 'wb') as dstfileobj:
250 fileobj_copy(dstfileobj, srcfileobj, size)
251
252 assert fs.exists(dstpath)
253
254 # file_mode of 0 is actually valid, so need explicit check.
255 if file_mode is not None:
256 fs.chmod(dstpath, file_mode)
257
258
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000259def zip_compress(content_generator, level=7):
260 """Reads chunks from |content_generator| and yields zip compressed chunks."""
261 compressor = zlib.compressobj(level)
262 for chunk in content_generator:
263 compressed = compressor.compress(chunk)
264 if compressed:
265 yield compressed
266 tail = compressor.flush(zlib.Z_FINISH)
267 if tail:
268 yield tail
269
270
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400271def zip_decompress(
272 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000273 """Reads zipped data from |content_generator| and yields decompressed data.
274
275 Decompresses data in small chunks (no larger than |chunk_size|) so that
276 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
277
278 Raises IOError if data is corrupted or incomplete.
279 """
280 decompressor = zlib.decompressobj()
281 compressed_size = 0
282 try:
283 for chunk in content_generator:
284 compressed_size += len(chunk)
285 data = decompressor.decompress(chunk, chunk_size)
286 if data:
287 yield data
288 while decompressor.unconsumed_tail:
289 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
290 if data:
291 yield data
292 tail = decompressor.flush()
293 if tail:
294 yield tail
295 except zlib.error as e:
296 raise IOError(
297 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
298 # Ensure all data was read and decompressed.
299 if decompressor.unused_data or decompressor.unconsumed_tail:
300 raise IOError('Not all data was decompressed')
301
302
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000303def get_zip_compression_level(filename):
304 """Given a filename calculates the ideal zip compression level to use."""
305 file_ext = os.path.splitext(filename)[1].lower()
306 # TODO(csharp): Profile to find what compression level works best.
307 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
308
309
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000310def create_directories(base_directory, files):
311 """Creates the directory structure needed by the given list of files."""
312 logging.debug('create_directories(%s, %d)', base_directory, len(files))
313 # Creates the tree of directories to create.
314 directories = set(os.path.dirname(f) for f in files)
315 for item in list(directories):
316 while item:
317 directories.add(item)
318 item = os.path.dirname(item)
319 for d in sorted(directories):
320 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700321 abs_d = os.path.join(base_directory, d)
322 if not fs.isdir(abs_d):
323 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000324
325
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500326def create_symlinks(base_directory, files):
327 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000328 for filepath, properties in files:
329 if 'l' not in properties:
330 continue
331 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500332 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000333 logging.warning('Ignoring symlink %s', filepath)
334 continue
335 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700336 try:
337 os.symlink(properties['l'], outfile) # pylint: disable=E1101
338 except OSError as e:
339 if e.errno == errno.EEXIST:
340 raise AlreadyExists('File %s already exists.' % outfile)
341 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000342
343
maruel12e30012015-10-09 11:55:35 -0700344def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000345 """Determines if the given files appears valid.
346
vadimsh129e5942017-01-04 16:42:46 -0800347 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000348 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700349 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700350 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800351 try:
352 actual_size = fs.stat(path).st_size
353 except OSError as e:
354 logging.warning(
355 'Can\'t read item %s, assuming it\'s invalid: %s',
356 os.path.basename(path), e)
357 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000358 if size != actual_size:
359 logging.warning(
360 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700361 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000362 return False
363 return True
364
365
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000366class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800367 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000368
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800369 Its digest and size may be provided in advance, if known. Otherwise they will
370 be derived from the file content.
371 """
372
373 def __init__(self, path, digest=None, size=None, high_priority=False):
374 super(FileItem, self).__init__(
375 digest,
maruel12e30012015-10-09 11:55:35 -0700376 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800377 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000378 self.path = path
379 self.compression_level = get_zip_compression_level(path)
380
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800381 def content(self):
382 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000383
384
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000385class BufferItem(Item):
386 """A byte buffer to push to Storage."""
387
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800388 def __init__(self, buf, high_priority=False):
389 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000390 self.buffer = buf
391
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800392 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000393 return [self.buffer]
394
395
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000396class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800397 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000398
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800399 Implements compression support, parallel 'contains' checks, parallel uploads
400 and more.
401
402 Works only within single namespace (and thus hashing algorithm and compression
403 scheme are fixed).
404
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400405 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
406 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800407 """
408
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700409 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000410 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400411 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700412 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400413 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000414 self._cpu_thread_pool = None
415 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400416 self._aborted = False
417 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000418
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000419 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700420 def hash_algo(self):
421 """Hashing algorithm used to name files in storage based on their content.
422
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400423 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700424 """
425 return self._hash_algo
426
427 @property
428 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500429 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700430 return self._storage_api.location
431
432 @property
433 def namespace(self):
434 """Isolate namespace used by this storage.
435
436 Indirectly defines hashing scheme and compression method used.
437 """
438 return self._storage_api.namespace
439
440 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000441 def cpu_thread_pool(self):
442 """ThreadPool for CPU-bound tasks like zipping."""
443 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500444 threads = max(threading_utils.num_processors(), 2)
445 if sys.maxsize <= 2L**32:
446 # On 32 bits userland, do not try to use more than 16 threads.
447 threads = min(threads, 16)
448 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000449 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000450
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000451 @property
452 def net_thread_pool(self):
453 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
454 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700455 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000456 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000457
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000458 def close(self):
459 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400460 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 if self._cpu_thread_pool:
462 self._cpu_thread_pool.join()
463 self._cpu_thread_pool.close()
464 self._cpu_thread_pool = None
465 if self._net_thread_pool:
466 self._net_thread_pool.join()
467 self._net_thread_pool.close()
468 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400469 logging.info('Done.')
470
471 def abort(self):
472 """Cancels any pending or future operations."""
473 # This is not strictly theadsafe, but in the worst case the logging message
474 # will be printed twice. Not a big deal. In other places it is assumed that
475 # unprotected reads and writes to _aborted are serializable (it is true
476 # for python) and thus no locking is used.
477 if not self._aborted:
478 logging.warning('Aborting... It can take a while.')
479 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000480
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000481 def __enter__(self):
482 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400483 assert not self._prev_sig_handlers, self._prev_sig_handlers
484 for s in (signal.SIGINT, signal.SIGTERM):
485 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000486 return self
487
488 def __exit__(self, _exc_type, _exc_value, _traceback):
489 """Context manager interface."""
490 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400491 while self._prev_sig_handlers:
492 s, h = self._prev_sig_handlers.popitem()
493 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000494 return False
495
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800497 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000498
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800499 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000500
501 Arguments:
502 items: list of Item instances that represents data to upload.
503
504 Returns:
505 List of items that were uploaded. All other items are already there.
506 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700507 logging.info('upload_items(items=%d)', len(items))
508
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800509 # Ensure all digests are calculated.
510 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700511 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800512
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000513 # For each digest keep only first Item that matches it. All other items
514 # are just indistinguishable copies from the point of view of isolate
515 # server (it doesn't care about paths at all, only content and digests).
516 seen = {}
517 duplicates = 0
518 for item in items:
519 if seen.setdefault(item.digest, item) is not item:
520 duplicates += 1
521 items = seen.values()
522 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700523 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000524
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000525 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000526 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000527 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800528 channel = threading_utils.TaskChannel()
529 for missing_item, push_state in self.get_missing_items(items):
530 missing.add(missing_item)
531 self.async_push(channel, missing_item, push_state)
532
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000533 # No need to spawn deadlock detector thread if there's nothing to upload.
534 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700535 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000536 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000537 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000538 detector.ping()
539 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000540 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000541 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000542 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000543 logging.info('All files are uploaded')
544
545 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000546 total = len(items)
547 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000548 logging.info(
549 'Total: %6d, %9.1fkb',
550 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000551 total_size / 1024.)
552 cache_hit = set(items) - missing
553 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 logging.info(
555 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
556 len(cache_hit),
557 cache_hit_size / 1024.,
558 len(cache_hit) * 100. / total,
559 cache_hit_size * 100. / total_size if total_size else 0)
560 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000561 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000562 logging.info(
563 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
564 len(cache_miss),
565 cache_miss_size / 1024.,
566 len(cache_miss) * 100. / total,
567 cache_miss_size * 100. / total_size if total_size else 0)
568
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000569 return uploaded
570
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800571 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000572 """Starts asynchronous push to the server in a parallel thread.
573
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800574 Can be used only after |item| was checked for presence on a server with
575 'get_missing_items' call. 'get_missing_items' returns |push_state| object
576 that contains storage specific information describing how to upload
577 the item (for example in case of cloud storage, it is signed upload URLs).
578
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000579 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000580 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000581 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800582 push_state: push state returned by 'get_missing_items' call for |item|.
583
584 Returns:
585 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000586 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800587 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400588 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700589 threading_utils.PRIORITY_HIGH if item.high_priority
590 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800591
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000592 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400593 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400594 if self._aborted:
595 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700596 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800597 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000598 return item
599
600 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700601 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800602 self.net_thread_pool.add_task_with_channel(
603 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000604 return
605
606 # If zipping is enabled, zip in a separate thread.
607 def zip_and_push():
608 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
609 # content right here. It will block until all file is zipped.
610 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400611 if self._aborted:
612 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800613 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000614 data = ''.join(stream)
615 except Exception as exc:
616 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800617 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000618 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000619 self.net_thread_pool.add_task_with_channel(
620 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000621 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000622
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800623 def push(self, item, push_state):
624 """Synchronously pushes a single item to the server.
625
626 If you need to push many items at once, consider using 'upload_items' or
627 'async_push' with instance of TaskChannel.
628
629 Arguments:
630 item: item to upload as instance of Item class.
631 push_state: push state returned by 'get_missing_items' call for |item|.
632
633 Returns:
634 Pushed item (same object as |item|).
635 """
636 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700637 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800638 self.async_push(channel, item, push_state)
639 pushed = channel.pull()
640 assert pushed is item
641 return item
642
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000643 def async_fetch(self, channel, priority, digest, size, sink):
644 """Starts asynchronous fetch from the server in a parallel thread.
645
646 Arguments:
647 channel: TaskChannel that receives back |digest| when download ends.
648 priority: thread pool task priority for the fetch.
649 digest: hex digest of an item to download.
650 size: expected size of the item (after decompression).
651 sink: function that will be called as sink(generator).
652 """
653 def fetch():
654 try:
655 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400656 stream = self._storage_api.fetch(digest, size, 0)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700657 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400658 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000659 # Run |stream| through verifier that will assert its size.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400660 verifier = FetchStreamVerifier(stream, self._hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000661 # Verified stream goes to |sink|.
662 sink(verifier.run())
663 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800664 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000665 raise
666 return digest
667
668 # Don't bother with zip_thread_pool for decompression. Decompression is
669 # really fast and most probably IO bound anyway.
670 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
671
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000672 def get_missing_items(self, items):
673 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000674
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000675 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000676
677 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000678 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000679
680 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800681 For each missing item it yields a pair (item, push_state), where:
682 * item - Item object that is missing (one of |items|).
683 * push_state - opaque object that contains storage specific information
684 describing how to upload the item (for example in case of cloud
685 storage, it is signed upload URLs). It can later be passed to
686 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000687 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000688 channel = threading_utils.TaskChannel()
689 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800690
691 # Ensure all digests are calculated.
692 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700693 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800694
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400695 def contains(batch):
696 if self._aborted:
697 raise Aborted()
698 return self._storage_api.contains(batch)
699
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000700 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800701 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400702 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400703 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000704 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800705
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000706 # Yield results as they come in.
707 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800708 for missing_item, push_state in channel.pull().iteritems():
709 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000710
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000711
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800712def batch_items_for_check(items):
713 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000714
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800715 Each batch corresponds to a single 'exists?' query to the server via a call
716 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000717
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800718 Arguments:
719 items: a list of Item objects.
720
721 Yields:
722 Batches of items to query for existence in a single operation,
723 each batch is a list of Item objects.
724 """
725 batch_count = 0
726 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
727 next_queries = []
728 for item in sorted(items, key=lambda x: x.size, reverse=True):
729 next_queries.append(item)
730 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000731 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800732 next_queries = []
733 batch_count += 1
734 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
735 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
736 if next_queries:
737 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000738
739
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000740class FetchQueue(object):
741 """Fetches items from Storage and places them into LocalCache.
742
743 It manages multiple concurrent fetch operations. Acts as a bridge between
744 Storage and LocalCache so that Storage and LocalCache don't depend on each
745 other at all.
746 """
747
748 def __init__(self, storage, cache):
749 self.storage = storage
750 self.cache = cache
751 self._channel = threading_utils.TaskChannel()
752 self._pending = set()
753 self._accessed = set()
754 self._fetched = cache.cached_set()
755
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400756 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700757 self,
758 digest,
759 size=UNKNOWN_FILE_SIZE,
760 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000761 """Starts asynchronous fetch of item |digest|."""
762 # Fetching it now?
763 if digest in self._pending:
764 return
765
766 # Mark this file as in use, verify_all_cached will later ensure it is still
767 # in cache.
768 self._accessed.add(digest)
769
770 # Already fetched? Notify cache to update item's LRU position.
771 if digest in self._fetched:
772 # 'touch' returns True if item is in cache and not corrupted.
773 if self.cache.touch(digest, size):
774 return
775 # Item is corrupted, remove it from cache and fetch it again.
776 self._fetched.remove(digest)
777 self.cache.evict(digest)
778
779 # TODO(maruel): It should look at the free disk space, the current cache
780 # size and the size of the new item on every new item:
781 # - Trim the cache as more entries are listed when free disk space is low,
782 # otherwise if the amount of data downloaded during the run > free disk
783 # space, it'll crash.
784 # - Make sure there's enough free disk space to fit all dependencies of
785 # this run! If not, abort early.
786
787 # Start fetching.
788 self._pending.add(digest)
789 self.storage.async_fetch(
790 self._channel, priority, digest, size,
791 functools.partial(self.cache.write, digest))
792
793 def wait(self, digests):
794 """Starts a loop that waits for at least one of |digests| to be retrieved.
795
796 Returns the first digest retrieved.
797 """
798 # Flush any already fetched items.
799 for digest in digests:
800 if digest in self._fetched:
801 return digest
802
803 # Ensure all requested items are being fetched now.
804 assert all(digest in self._pending for digest in digests), (
805 digests, self._pending)
806
807 # Wait for some requested item to finish fetching.
808 while self._pending:
809 digest = self._channel.pull()
810 self._pending.remove(digest)
811 self._fetched.add(digest)
812 if digest in digests:
813 return digest
814
815 # Should never reach this point due to assert above.
816 raise RuntimeError('Impossible state')
817
818 def inject_local_file(self, path, algo):
819 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700820 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000821 data = f.read()
822 digest = algo(data).hexdigest()
823 self.cache.write(digest, [data])
824 self._fetched.add(digest)
825 return digest
826
827 @property
828 def pending_count(self):
829 """Returns number of items to be fetched."""
830 return len(self._pending)
831
832 def verify_all_cached(self):
833 """True if all accessed items are in cache."""
834 return self._accessed.issubset(self.cache.cached_set())
835
836
837class FetchStreamVerifier(object):
838 """Verifies that fetched file is valid before passing it to the LocalCache."""
839
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400840 def __init__(self, stream, hasher, expected_digest, expected_size):
841 """Initializes the verifier.
842
843 Arguments:
844 * stream: an iterable yielding chunks of content
845 * hasher: an object from hashlib that supports update() and hexdigest()
846 (eg, hashlib.sha1).
847 * expected_digest: if the entire stream is piped through hasher and then
848 summarized via hexdigest(), this should be the result. That is, it
849 should be a hex string like 'abc123'.
850 * expected_size: either the expected size of the stream, or
851 UNKNOWN_FILE_SIZE.
852 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400853 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000854 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400855 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000856 self.expected_size = expected_size
857 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400858 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000859
860 def run(self):
861 """Generator that yields same items as |stream|.
862
863 Verifies |stream| is complete before yielding a last chunk to consumer.
864
865 Also wraps IOError produced by consumer into MappingError exceptions since
866 otherwise Storage will retry fetch on unrelated local cache errors.
867 """
868 # Read one chunk ahead, keep it in |stored|.
869 # That way a complete stream can be verified before pushing last chunk
870 # to consumer.
871 stored = None
872 for chunk in self.stream:
873 assert chunk is not None
874 if stored is not None:
875 self._inspect_chunk(stored, is_last=False)
876 try:
877 yield stored
878 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400879 raise isolated_format.MappingError(
880 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000881 stored = chunk
882 if stored is not None:
883 self._inspect_chunk(stored, is_last=True)
884 try:
885 yield stored
886 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400887 raise isolated_format.MappingError(
888 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000889
890 def _inspect_chunk(self, chunk, is_last):
891 """Called for each fetched chunk before passing it to consumer."""
892 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400893 self.rolling_hash.update(chunk)
894 if not is_last:
895 return
896
897 if ((self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000898 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400899 msg = 'Incorrect file size: want %d, got %d' % (
900 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400901 raise IOError(msg)
902
903 actual_digest = self.rolling_hash.hexdigest()
904 if self.expected_digest != actual_digest:
905 msg = 'Incorrect digest: want %s, got %s' % (
906 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400907 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000908
909
nodir445097b2016-06-03 22:50:26 -0700910class CacheMiss(Exception):
911 """Raised when an item is not in cache."""
912
913 def __init__(self, digest):
914 self.digest = digest
915 super(CacheMiss, self).__init__(
916 'Item with digest %r is not found in cache' % digest)
917
918
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000919class LocalCache(object):
920 """Local cache that stores objects fetched via Storage.
921
922 It can be accessed concurrently from multiple threads, so it should protect
923 its internal state with some lock.
924 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500925 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000926
maruel064c0a32016-04-05 11:47:15 -0700927 def __init__(self):
928 self._lock = threading_utils.LockWithAssert()
929 # Profiling values.
930 self._added = []
931 self._initial_number_items = 0
932 self._initial_size = 0
933 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700934 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700935
nodirbe642ff2016-06-09 15:51:51 -0700936 def __contains__(self, digest):
937 raise NotImplementedError()
938
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000939 def __enter__(self):
940 """Context manager interface."""
941 return self
942
943 def __exit__(self, _exc_type, _exec_value, _traceback):
944 """Context manager interface."""
945 return False
946
maruel064c0a32016-04-05 11:47:15 -0700947 @property
948 def added(self):
949 return self._added[:]
950
951 @property
952 def evicted(self):
953 return self._evicted[:]
954
955 @property
tansell9e04a8d2016-07-28 09:31:59 -0700956 def used(self):
957 return self._used[:]
958
959 @property
maruel064c0a32016-04-05 11:47:15 -0700960 def initial_number_items(self):
961 return self._initial_number_items
962
963 @property
964 def initial_size(self):
965 return self._initial_size
966
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000967 def cached_set(self):
968 """Returns a set of all cached digests (always a new object)."""
969 raise NotImplementedError()
970
maruel36a963d2016-04-08 17:15:49 -0700971 def cleanup(self):
972 """Deletes any corrupted item from the cache and trims it if necessary."""
973 raise NotImplementedError()
974
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000975 def touch(self, digest, size):
976 """Ensures item is not corrupted and updates its LRU position.
977
978 Arguments:
979 digest: hash digest of item to check.
980 size: expected size of this item.
981
982 Returns:
983 True if item is in cache and not corrupted.
984 """
985 raise NotImplementedError()
986
987 def evict(self, digest):
988 """Removes item from cache if it's there."""
989 raise NotImplementedError()
990
tansell9e04a8d2016-07-28 09:31:59 -0700991 def getfileobj(self, digest):
992 """Returns a readable file like object.
993
994 If file exists on the file system it will have a .name attribute with an
995 absolute path to the file.
996 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000997 raise NotImplementedError()
998
999 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001000 """Reads data from |content| generator and stores it in cache.
1001
1002 Returns digest to simplify chaining.
1003 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001004 raise NotImplementedError()
1005
maruele6fc9382017-05-04 09:03:48 -07001006 def trim(self):
1007 """Enforces cache policies.
1008
1009 Returns:
1010 Number of items evicted.
1011 """
1012 raise NotImplementedError()
1013
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001014
1015class MemoryCache(LocalCache):
1016 """LocalCache implementation that stores everything in memory."""
1017
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001018 def __init__(self, file_mode_mask=0500):
1019 """Args:
1020 file_mode_mask: bit mask to AND file mode with. Default value will make
1021 all mapped files to be read only.
1022 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001023 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001024 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001025 self._contents = {}
1026
nodirbe642ff2016-06-09 15:51:51 -07001027 def __contains__(self, digest):
1028 with self._lock:
1029 return digest in self._contents
1030
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001031 def cached_set(self):
1032 with self._lock:
1033 return set(self._contents)
1034
maruel36a963d2016-04-08 17:15:49 -07001035 def cleanup(self):
1036 pass
1037
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001038 def touch(self, digest, size):
1039 with self._lock:
1040 return digest in self._contents
1041
1042 def evict(self, digest):
1043 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001044 v = self._contents.pop(digest, None)
1045 if v is not None:
1046 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001047
tansell9e04a8d2016-07-28 09:31:59 -07001048 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001049 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001050 try:
tansell9e04a8d2016-07-28 09:31:59 -07001051 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001052 except KeyError:
1053 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001054 self._used.append(len(d))
1055 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001056
1057 def write(self, digest, content):
1058 # Assemble whole stream before taking the lock.
1059 data = ''.join(content)
1060 with self._lock:
1061 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001062 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001063 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001064
maruele6fc9382017-05-04 09:03:48 -07001065 def trim(self):
1066 """Trimming is not implemented for MemoryCache."""
1067 return 0
1068
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001069
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001070class CachePolicies(object):
1071 def __init__(self, max_cache_size, min_free_space, max_items):
1072 """
1073 Arguments:
1074 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1075 cache is effectively a leak.
1076 - min_free_space: Trim if disk free space becomes lower than this value. If
1077 0, it unconditionally fill the disk.
1078 - max_items: Maximum number of items to keep in the cache. If 0, do not
1079 enforce a limit.
1080 """
1081 self.max_cache_size = max_cache_size
1082 self.min_free_space = min_free_space
1083 self.max_items = max_items
1084
1085
1086class DiskCache(LocalCache):
1087 """Stateful LRU cache in a flat hash table in a directory.
1088
1089 Saves its state as json file.
1090 """
maruel12e30012015-10-09 11:55:35 -07001091 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001092
maruele6fc9382017-05-04 09:03:48 -07001093 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001094 """
1095 Arguments:
1096 cache_dir: directory where to place the cache.
1097 policies: cache retention policies.
1098 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001099 trim: if True to enforce |policies| right away.
1100 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001101 """
maruel064c0a32016-04-05 11:47:15 -07001102 # All protected methods (starting with '_') except _path should be called
1103 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001104 super(DiskCache, self).__init__()
1105 self.cache_dir = cache_dir
1106 self.policies = policies
1107 self.hash_algo = hash_algo
1108 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001109 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001110 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001111 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001112 file_path.ensure_tree(self.cache_dir)
1113 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001114 # The first item in the LRU cache that must not be evicted during this run
1115 # since it was referenced. All items more recent that _protected in the LRU
1116 # cache are also inherently protected. It could be a set() of all items
1117 # referenced but this increases memory usage without a use case.
1118 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001119 # Cleanup operations done by self._load(), if any.
1120 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001121 with tools.Profiler('Setup'):
1122 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001123 self._load(trim, time_fn)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001124
nodirbe642ff2016-06-09 15:51:51 -07001125 def __contains__(self, digest):
1126 with self._lock:
1127 return digest in self._lru
1128
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001129 def __enter__(self):
1130 return self
1131
1132 def __exit__(self, _exc_type, _exec_value, _traceback):
1133 with tools.Profiler('CleanupTrimming'):
1134 with self._lock:
1135 self._trim()
1136
1137 logging.info(
1138 '%5d (%8dkb) added',
1139 len(self._added), sum(self._added) / 1024)
1140 logging.info(
1141 '%5d (%8dkb) current',
1142 len(self._lru),
1143 sum(self._lru.itervalues()) / 1024)
1144 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001145 '%5d (%8dkb) evicted',
1146 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001147 logging.info(
1148 ' %8dkb free',
1149 self._free_disk / 1024)
1150 return False
1151
1152 def cached_set(self):
1153 with self._lock:
1154 return self._lru.keys_set()
1155
maruel36a963d2016-04-08 17:15:49 -07001156 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001157 """Cleans up the cache directory.
1158
1159 Ensures there is no unknown files in cache_dir.
1160 Ensures the read-only bits are set correctly.
1161
1162 At that point, the cache was already loaded, trimmed to respect cache
1163 policies.
1164 """
1165 fs.chmod(self.cache_dir, 0700)
1166 # Ensure that all files listed in the state still exist and add new ones.
1167 previous = self._lru.keys_set()
1168 # It'd be faster if there were a readdir() function.
1169 for filename in fs.listdir(self.cache_dir):
1170 if filename == self.STATE_FILE:
1171 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1172 continue
1173 if filename in previous:
1174 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1175 previous.remove(filename)
1176 continue
1177
1178 # An untracked file. Delete it.
1179 logging.warning('Removing unknown file %s from cache', filename)
1180 p = self._path(filename)
1181 if fs.isdir(p):
1182 try:
1183 file_path.rmtree(p)
1184 except OSError:
1185 pass
1186 else:
1187 file_path.try_remove(p)
1188 continue
1189
1190 if previous:
1191 # Filter out entries that were not found.
1192 logging.warning('Removed %d lost files', len(previous))
1193 for filename in previous:
1194 self._lru.pop(filename)
maruele6fc9382017-05-04 09:03:48 -07001195 self._save()
maruel36a963d2016-04-08 17:15:49 -07001196
1197 # What remains to be done is to hash every single item to
1198 # detect corruption, then save to ensure state.json is up to date.
1199 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1200 # TODO(maruel): Let's revisit once directory metadata is stored in
1201 # state.json so only the files that had been mapped since the last cleanup()
1202 # call are manually verified.
1203 #
1204 #with self._lock:
1205 # for digest in self._lru:
1206 # if not isolated_format.is_valid_hash(
1207 # self._path(digest), self.hash_algo):
1208 # self.evict(digest)
1209 # logging.info('Deleted corrupted item: %s', digest)
1210
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001211 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001212 """Verifies an actual file is valid and bumps its LRU position.
1213
1214 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1215 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001216
1217 Note that is doesn't compute the hash so it could still be corrupted if the
1218 file size didn't change.
1219
1220 TODO(maruel): More stringent verification while keeping the check fast.
1221 """
1222 # Do the check outside the lock.
1223 if not is_valid_file(self._path(digest), size):
1224 return False
1225
1226 # Update it's LRU position.
1227 with self._lock:
1228 if digest not in self._lru:
1229 return False
1230 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001231 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001232 return True
1233
1234 def evict(self, digest):
1235 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001236 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001237 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001238 self._lru.pop(digest)
1239 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1240
tansell9e04a8d2016-07-28 09:31:59 -07001241 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001242 try:
tansell9e04a8d2016-07-28 09:31:59 -07001243 f = fs.open(self._path(digest), 'rb')
1244 with self._lock:
1245 self._used.append(self._lru[digest])
1246 return f
nodir445097b2016-06-03 22:50:26 -07001247 except IOError:
1248 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001249
1250 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001251 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001252 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001253 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001254 path = self._path(digest)
1255 # A stale broken file may remain. It is possible for the file to have write
1256 # access bit removed which would cause the file_write() call to fail to open
1257 # in write mode. Take no chance here.
1258 file_path.try_remove(path)
1259 try:
1260 size = file_write(path, content)
1261 except:
1262 # There are two possible places were an exception can occur:
1263 # 1) Inside |content| generator in case of network or unzipping errors.
1264 # 2) Inside file_write itself in case of disk IO errors.
1265 # In any case delete an incomplete file and propagate the exception to
1266 # caller, it will be logged there.
1267 file_path.try_remove(path)
1268 raise
1269 # Make the file read-only in the cache. This has a few side-effects since
1270 # the file node is modified, so every directory entries to this file becomes
1271 # read-only. It's fine here because it is a new file.
1272 file_path.set_read_only(path, True)
1273 with self._lock:
1274 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001275 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001276
nodirf33b8d62016-10-26 22:34:58 -07001277 def get_oldest(self):
1278 """Returns digest of the LRU item or None."""
1279 try:
1280 return self._lru.get_oldest()[0]
1281 except KeyError:
1282 return None
1283
1284 def get_timestamp(self, digest):
1285 """Returns timestamp of last use of an item.
1286
1287 Raises KeyError if item is not found.
1288 """
1289 return self._lru.get_timestamp(digest)
1290
1291 def trim(self):
1292 """Forces retention policies."""
1293 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001294 return self._trim()
nodirf33b8d62016-10-26 22:34:58 -07001295
maruele6fc9382017-05-04 09:03:48 -07001296 def _load(self, trim, time_fn):
maruel2e8d0f52016-07-16 07:51:29 -07001297 """Loads state of the cache from json file.
1298
1299 If cache_dir does not exist on disk, it is created.
1300 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001301 self._lock.assert_locked()
1302
maruel2e8d0f52016-07-16 07:51:29 -07001303 if not fs.isfile(self.state_file):
1304 if not os.path.isdir(self.cache_dir):
1305 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001306 else:
maruel2e8d0f52016-07-16 07:51:29 -07001307 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001308 try:
1309 self._lru = lru.LRUDict.load(self.state_file)
1310 except ValueError as err:
1311 logging.error('Failed to load cache state: %s' % (err,))
1312 # Don't want to keep broken state file.
1313 file_path.try_remove(self.state_file)
maruele6fc9382017-05-04 09:03:48 -07001314 if time_fn:
1315 self._lru.time_fn = time_fn
nodirf33b8d62016-10-26 22:34:58 -07001316 if trim:
1317 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001318 # We want the initial cache size after trimming, i.e. what is readily
1319 # avaiable.
1320 self._initial_number_items = len(self._lru)
1321 self._initial_size = sum(self._lru.itervalues())
1322 if self._evicted:
1323 logging.info(
1324 'Trimming evicted items with the following sizes: %s',
1325 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001326
1327 def _save(self):
1328 """Saves the LRU ordering."""
1329 self._lock.assert_locked()
1330 if sys.platform != 'win32':
1331 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001332 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001333 # Necessary otherwise the file can't be created.
1334 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001335 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001336 file_path.set_read_only(self.state_file, False)
1337 self._lru.save(self.state_file)
1338
1339 def _trim(self):
1340 """Trims anything we don't know, make sure enough free space exists."""
1341 self._lock.assert_locked()
1342
1343 # Ensure maximum cache size.
1344 if self.policies.max_cache_size:
1345 total_size = sum(self._lru.itervalues())
1346 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001347 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001348
1349 # Ensure maximum number of items in the cache.
1350 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1351 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001352 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001353
1354 # Ensure enough free space.
1355 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001356 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001357 while (
1358 self.policies.min_free_space and
1359 self._lru and
1360 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001361 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001362 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001363
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001364 if trimmed_due_to_space:
1365 total_usage = sum(self._lru.itervalues())
1366 usage_percent = 0.
1367 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001368 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1369
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001370 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001371 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1372 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1373 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001374 self._free_disk / 1024.,
1375 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001376 usage_percent,
1377 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001378 self._save()
maruele6fc9382017-05-04 09:03:48 -07001379 return trimmed_due_to_space
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001380
1381 def _path(self, digest):
1382 """Returns the path to one item."""
1383 return os.path.join(self.cache_dir, digest)
1384
maruel2e8d0f52016-07-16 07:51:29 -07001385 def _remove_lru_file(self, allow_protected):
1386 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001387 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001388 try:
nodireabc11c2016-10-18 16:37:28 -07001389 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001390 if not allow_protected and digest == self._protected:
maruele6fc9382017-05-04 09:03:48 -07001391 raise Error(
1392 'Not enough space to fetch the whole isolated tree; %sb free, min '
1393 'is %sb' % (self._free_disk, self.policies.min_free_space))
maruel083fa552016-04-08 14:38:01 -07001394 except KeyError:
1395 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001396 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001397 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001398 self._delete_file(digest, size)
1399 return size
1400
1401 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1402 """Adds an item into LRU cache marking it as a newest one."""
1403 self._lock.assert_locked()
1404 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001405 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001406 self._added.append(size)
1407 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001408 self._free_disk -= size
1409 # Do a quicker version of self._trim(). It only enforces free disk space,
1410 # not cache size limits. It doesn't actually look at real free disk space,
1411 # only uses its cache values. self._trim() will be called later to enforce
1412 # real trimming but doing this quick version here makes it possible to map
1413 # an isolated that is larger than the current amount of free disk space when
1414 # the cache size is already large.
1415 while (
1416 self.policies.min_free_space and
1417 self._lru and
1418 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001419 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001420
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001421 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1422 """Deletes cache file from the file system."""
1423 self._lock.assert_locked()
1424 try:
1425 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001426 try:
1427 size = fs.stat(self._path(digest)).st_size
1428 except OSError:
1429 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001430 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001431 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001432 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001433 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001434 if e.errno != errno.ENOENT:
1435 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001436
1437
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001438class IsolatedBundle(object):
1439 """Fetched and parsed .isolated file with all dependencies."""
1440
Vadim Shtayura3148e072014-09-02 18:51:52 -07001441 def __init__(self):
1442 self.command = []
1443 self.files = {}
1444 self.read_only = None
1445 self.relative_cwd = None
1446 # The main .isolated file, a IsolatedFile instance.
1447 self.root = None
1448
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001449 def fetch(self, fetch_queue, root_isolated_hash, algo):
1450 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001451
1452 It enables support for "included" .isolated files. They are processed in
1453 strict order but fetched asynchronously from the cache. This is important so
1454 that a file in an included .isolated file that is overridden by an embedding
1455 .isolated file is not fetched needlessly. The includes are fetched in one
1456 pass and the files are fetched as soon as all the ones on the left-side
1457 of the tree were fetched.
1458
1459 The prioritization is very important here for nested .isolated files.
1460 'includes' have the highest priority and the algorithm is optimized for both
1461 deep and wide trees. A deep one is a long link of .isolated files referenced
1462 one at a time by one item in 'includes'. A wide one has a large number of
1463 'includes' in a single .isolated file. 'left' is defined as an included
1464 .isolated file earlier in the 'includes' list. So the order of the elements
1465 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001466
1467 As a side effect this method starts asynchronous fetch of all data files
1468 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1469 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001470 """
1471 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1472
1473 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1474 pending = {}
1475 # Set of hashes of already retrieved items to refuse recursive includes.
1476 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001477 # Set of IsolatedFile's whose data files have already being fetched.
1478 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001479
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001480 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001481 h = isolated_file.obj_hash
1482 if h in seen:
1483 raise isolated_format.IsolatedError(
1484 'IsolatedFile %s is retrieved recursively' % h)
1485 assert h not in pending
1486 seen.add(h)
1487 pending[h] = isolated_file
1488 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1489
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001490 # Start fetching root *.isolated file (single file, not the whole bundle).
1491 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001492
1493 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001494 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001495 item_hash = fetch_queue.wait(pending)
1496 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001497 with fetch_queue.cache.getfileobj(item_hash) as f:
1498 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001499
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001500 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001501 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001502 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001503
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001504 # Always fetch *.isolated files in traversal order, waiting if necessary
1505 # until next to-be-processed node loads. "Waiting" is done by yielding
1506 # back to the outer loop, that waits until some *.isolated is loaded.
1507 for node in isolated_format.walk_includes(self.root):
1508 if node not in processed:
1509 # Not visited, and not yet loaded -> wait for it to load.
1510 if not node.is_loaded:
1511 break
1512 # Not visited and loaded -> process it and continue the traversal.
1513 self._start_fetching_files(node, fetch_queue)
1514 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001515
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001516 # All *.isolated files should be processed by now and only them.
1517 all_isolateds = set(isolated_format.walk_includes(self.root))
1518 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001519
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001520 # Extract 'command' and other bundle properties.
1521 for node in isolated_format.walk_includes(self.root):
1522 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001523 self.relative_cwd = self.relative_cwd or ''
1524
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001525 def _start_fetching_files(self, isolated, fetch_queue):
1526 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001527
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001528 Modifies self.files.
1529 """
maruel10bea7b2016-12-07 05:03:49 -08001530 files = isolated.data.get('files', {})
1531 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1532 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001533 # Root isolated has priority on the files being mapped. In particular,
1534 # overridden files must not be fetched.
1535 if filepath not in self.files:
1536 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001537
1538 # Make sure if the isolated is read only, the mode doesn't have write
1539 # bits.
1540 if 'm' in properties and self.read_only:
1541 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1542
1543 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001544 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001545 fetch_queue.add(
1546 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1547
1548 def _update_self(self, node):
1549 """Extracts bundle global parameters from loaded *.isolated file.
1550
1551 Will be called with each loaded *.isolated file in order of traversal of
1552 isolated include graph (see isolated_format.walk_includes).
1553 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001554 # Grabs properties.
1555 if not self.command and node.data.get('command'):
1556 # Ensure paths are correctly separated on windows.
1557 self.command = node.data['command']
1558 if self.command:
1559 self.command[0] = self.command[0].replace('/', os.path.sep)
1560 self.command = tools.fix_python_path(self.command)
1561 if self.read_only is None and node.data.get('read_only') is not None:
1562 self.read_only = node.data['read_only']
1563 if (self.relative_cwd is None and
1564 node.data.get('relative_cwd') is not None):
1565 self.relative_cwd = node.data['relative_cwd']
1566
1567
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001568def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001569 """Returns Storage class that can upload and download from |namespace|.
1570
1571 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001572 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001573 namespace: isolate namespace to operate in, also defines hashing and
1574 compression scheme used, i.e. namespace names that end with '-gzip'
1575 store compressed data.
1576
1577 Returns:
1578 Instance of Storage.
1579 """
aludwin81178302016-11-30 17:18:49 -08001580 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001581
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001582
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001583def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001584 """Uploads the given tree to the given url.
1585
1586 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001587 base_url: The url of the isolate server to upload to.
1588 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001589 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001590 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001591 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001592 # Filter out symlinks, since they are not represented by items on isolate
1593 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001594 items = []
1595 seen = set()
1596 skipped = 0
1597 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001598 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001599 if 'l' not in metadata and filepath not in seen:
1600 seen.add(filepath)
1601 item = FileItem(
1602 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001603 digest=metadata['h'],
1604 size=metadata['s'],
1605 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001606 items.append(item)
1607 else:
1608 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001609
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001610 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001611 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001612 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001613
1614
maruel4409e302016-07-19 14:25:51 -07001615def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001616 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001617
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001618 Arguments:
1619 isolated_hash: hash of the root *.isolated file.
1620 storage: Storage class that communicates with isolate storage.
1621 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001622 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001623 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001624
1625 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001626 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001627 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001628 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001629 'fetch_isolated(%s, %s, %s, %s, %s)',
1630 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001631 # Hash algorithm to use, defined by namespace |storage| is using.
1632 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001633 with cache:
1634 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001635 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001636
1637 with tools.Profiler('GetIsolateds'):
1638 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001639 if not isolated_format.is_valid_hash(isolated_hash, algo):
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001640 logging.debug('%s is not a valid hash, assuming a file '
1641 '(algo was %s, hash size was %d)',
1642 isolated_hash, algo(), algo().digest_size)
maruel1ceb3872015-10-14 06:10:44 -07001643 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001644 try:
maruel1ceb3872015-10-14 06:10:44 -07001645 isolated_hash = fetch_queue.inject_local_file(path, algo)
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001646 except IOError as e:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001647 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001648 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001649 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001650
1651 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001652 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001653
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001654 with tools.Profiler('GetRest'):
1655 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001656 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001657 create_directories(outdir, bundle.files)
1658 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001659
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001660 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001661 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001662 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001663
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001664 # Multimap: digest -> list of pairs (path, props).
1665 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001666 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001667 if 'h' in props:
1668 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001669
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001670 # Now block on the remaining files to be downloaded and mapped.
1671 logging.info('Retrieving remaining files (%d of them)...',
1672 fetch_queue.pending_count)
1673 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001674 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001675 while remaining:
1676 detector.ping()
1677
1678 # Wait for any item to finish fetching to cache.
1679 digest = fetch_queue.wait(remaining)
1680
tansell9e04a8d2016-07-28 09:31:59 -07001681 # Create the files in the destination using item in cache as the
1682 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001683 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001684 fullpath = os.path.join(outdir, filepath)
1685
1686 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001687 filetype = props.get('t', 'basic')
1688
1689 if filetype == 'basic':
1690 file_mode = props.get('m')
1691 if file_mode:
1692 # Ignore all bits apart from the user
1693 file_mode &= 0700
1694 putfile(
1695 srcfileobj, fullpath, file_mode,
1696 use_symlink=use_symlinks)
1697
tansell26de79e2016-11-13 18:41:11 -08001698 elif filetype == 'tar':
1699 basedir = os.path.dirname(fullpath)
1700 with tarfile.TarFile(fileobj=srcfileobj) as extractor:
1701 for ti in extractor:
1702 if not ti.isfile():
1703 logging.warning(
1704 'Path(%r) is nonfile (%s), skipped',
1705 ti.name, ti.type)
1706 continue
1707 fp = os.path.normpath(os.path.join(basedir, ti.name))
1708 if not fp.startswith(basedir):
1709 logging.error(
1710 'Path(%r) is outside root directory',
1711 fp)
1712 ifd = extractor.extractfile(ti)
1713 file_path.ensure_tree(os.path.dirname(fp))
1714 putfile(ifd, fp, 0700, ti.size)
1715
tanselle4288c32016-07-28 09:45:40 -07001716 else:
1717 raise isolated_format.IsolatedError(
1718 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001719
1720 # Report progress.
1721 duration = time.time() - last_update
1722 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1723 msg = '%d files remaining...' % len(remaining)
1724 print msg
1725 logging.info(msg)
1726 last_update = time.time()
1727
1728 # Cache could evict some items we just tried to fetch, it's a fatal error.
1729 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001730 raise isolated_format.MappingError(
1731 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001732 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001733
1734
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001735def directory_to_metadata(root, algo, blacklist):
1736 """Returns the FileItem list and .isolated metadata for a directory."""
1737 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001738 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001739 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001740 metadata = {
1741 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001742 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001743 for relpath in paths
1744 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001745 for v in metadata.itervalues():
1746 v.pop('t')
1747 items = [
1748 FileItem(
1749 path=os.path.join(root, relpath),
1750 digest=meta['h'],
1751 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001752 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001753 for relpath, meta in metadata.iteritems() if 'h' in meta
1754 ]
1755 return items, metadata
1756
1757
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001758def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001759 """Stores every entries and returns the relevant data.
1760
1761 Arguments:
1762 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001763 files: list of file paths to upload. If a directory is specified, a
1764 .isolated file is created and its hash is returned.
1765 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001766
1767 Returns:
1768 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1769 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001770 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001771 assert all(isinstance(i, unicode) for i in files), files
1772 if len(files) != len(set(map(os.path.abspath, files))):
1773 raise Error('Duplicate entries found.')
1774
maruel064c0a32016-04-05 11:47:15 -07001775 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001776 results = []
1777 # The temporary directory is only created as needed.
1778 tempdir = None
1779 try:
1780 # TODO(maruel): Yield the files to a worker thread.
1781 items_to_upload = []
1782 for f in files:
1783 try:
1784 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001785 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001786 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001787 items, metadata = directory_to_metadata(
1788 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001789
1790 # Create the .isolated file.
1791 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001792 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1793 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001794 os.close(handle)
1795 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001796 'algo':
1797 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001798 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001799 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001800 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001801 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001802 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001803 items_to_upload.extend(items)
1804 items_to_upload.append(
1805 FileItem(
1806 path=isolated,
1807 digest=h,
maruel12e30012015-10-09 11:55:35 -07001808 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001809 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001810 results.append((h, f))
1811
maruel12e30012015-10-09 11:55:35 -07001812 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001813 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001814 items_to_upload.append(
1815 FileItem(
1816 path=filepath,
1817 digest=h,
maruel12e30012015-10-09 11:55:35 -07001818 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001819 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001820 results.append((h, f))
1821 else:
1822 raise Error('%s is neither a file or directory.' % f)
1823 except OSError:
1824 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001825 uploaded = storage.upload_items(items_to_upload)
1826 cold = [i for i in items_to_upload if i in uploaded]
1827 hot = [i for i in items_to_upload if i not in uploaded]
1828 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001829 finally:
maruel12e30012015-10-09 11:55:35 -07001830 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001831 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001832
1833
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001834def archive(out, namespace, files, blacklist):
1835 if files == ['-']:
1836 files = sys.stdin.readlines()
1837
1838 if not files:
1839 raise Error('Nothing to upload')
1840
1841 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001842 blacklist = tools.gen_blacklist(blacklist)
1843 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001844 # Ignore stats.
1845 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001846 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1847
1848
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001849@subcommand.usage('<file1..fileN> or - to read from stdin')
1850def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001851 """Archives data to the server.
1852
1853 If a directory is specified, a .isolated file is created the whole directory
1854 is uploaded. Then this .isolated file can be included in another one to run
1855 commands.
1856
1857 The commands output each file that was processed with its content hash. For
1858 directories, the .isolated generated for the directory is listed as the
1859 directory entry itself.
1860 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001861 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001862 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001863 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001864 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001865 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001866 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001867 except Error as e:
1868 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001869 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001870
1871
1872def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001873 """Download data from the server.
1874
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001875 It can either download individual files or a complete tree from a .isolated
1876 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001877 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001878 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001879 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001880 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001881 help='hash of an isolated file, .isolated file content is discarded, use '
1882 '--file if you need it')
1883 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001884 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1885 help='hash and destination of a file, can be used multiple times')
1886 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001887 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001888 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001889 parser.add_option(
1890 '--use-symlinks', action='store_true',
1891 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001892 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001893 options, args = parser.parse_args(args)
1894 if args:
1895 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001896 if not file_path.enable_symlink():
1897 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001898
nodir55be77b2016-05-03 09:39:57 -07001899 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001900 if bool(options.isolated) == bool(options.file):
1901 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001902 if not options.cache and options.use_symlinks:
1903 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001904
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001905 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001906 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001907 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001908 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001909 if (fs.isfile(options.target) or
1910 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001911 parser.error(
1912 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001913 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001914 # Fetching individual files.
1915 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001916 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001917 channel = threading_utils.TaskChannel()
1918 pending = {}
1919 for digest, dest in options.file:
1920 pending[digest] = dest
1921 storage.async_fetch(
1922 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001923 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001924 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001925 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001926 functools.partial(file_write, os.path.join(options.target, dest)))
1927 while pending:
1928 fetched = channel.pull()
1929 dest = pending.pop(fetched)
1930 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001931
Vadim Shtayura3172be52013-12-03 12:49:05 -08001932 # Fetching whole isolated tree.
1933 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001934 with cache:
1935 bundle = fetch_isolated(
1936 isolated_hash=options.isolated,
1937 storage=storage,
1938 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001939 outdir=options.target,
1940 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001941 if bundle.command:
1942 rel = os.path.join(options.target, bundle.relative_cwd)
1943 print('To run this test please run from the directory %s:' %
1944 os.path.join(options.target, rel))
1945 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001946
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001947 return 0
1948
1949
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001950def add_archive_options(parser):
1951 parser.add_option(
1952 '--blacklist',
1953 action='append', default=list(DEFAULT_BLACKLIST),
1954 help='List of regexp to use as blacklist filter when uploading '
1955 'directories')
1956
1957
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001958def add_isolate_server_options(parser):
1959 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001960 parser.add_option(
1961 '-I', '--isolate-server',
1962 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001963 help='URL of the Isolate Server to use. Defaults to the environment '
1964 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1965 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001966 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001967 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001968 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001969 '--namespace', default='default-gzip',
1970 help='The namespace to use on the Isolate Server, default: %default')
1971
1972
nodir55be77b2016-05-03 09:39:57 -07001973def process_isolate_server_options(
1974 parser, options, set_exception_handler, required):
1975 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001976
1977 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001978 """
1979 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001980 if required:
1981 parser.error('--isolate-server is required.')
1982 return
1983
aludwind7b7b7e2017-06-29 16:38:50 -07001984 if options.grpc_proxy:
1985 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001986 else:
1987 try:
1988 options.isolate_server = net.fix_url(options.isolate_server)
1989 except ValueError as e:
1990 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001991 if set_exception_handler:
1992 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001993 try:
1994 return auth.ensure_logged_in(options.isolate_server)
1995 except ValueError as e:
1996 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001997
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001998
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001999def add_cache_options(parser):
2000 cache_group = optparse.OptionGroup(parser, 'Cache management')
2001 cache_group.add_option(
2002 '--cache', metavar='DIR',
2003 help='Directory to keep a local cache of the files. Accelerates download '
2004 'by reusing already downloaded files. Default=%default')
2005 cache_group.add_option(
2006 '--max-cache-size',
2007 type='int',
2008 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002009 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002010 help='Trim if the cache gets larger than this value, default=%default')
2011 cache_group.add_option(
2012 '--min-free-space',
2013 type='int',
2014 metavar='NNN',
2015 default=2*1024*1024*1024,
2016 help='Trim if disk free space becomes lower than this value, '
2017 'default=%default')
2018 cache_group.add_option(
2019 '--max-items',
2020 type='int',
2021 metavar='NNN',
2022 default=100000,
2023 help='Trim if more than this number of items are in the cache '
2024 'default=%default')
2025 parser.add_option_group(cache_group)
2026
2027
maruele6fc9382017-05-04 09:03:48 -07002028def process_cache_options(options, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002029 if options.cache:
2030 policies = CachePolicies(
2031 options.max_cache_size, options.min_free_space, options.max_items)
2032
2033 # |options.cache| path may not exist until DiskCache() instance is created.
2034 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002035 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002036 policies,
nodirf33b8d62016-10-26 22:34:58 -07002037 isolated_format.get_hash_algo(options.namespace),
maruele6fc9382017-05-04 09:03:48 -07002038 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002039 else:
2040 return MemoryCache()
2041
2042
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002043class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002044 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002045 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002046 self,
2047 version=__version__,
2048 prog=os.path.basename(sys.modules[__name__].__file__),
2049 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002050 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002051
2052 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002053 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002054 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002055 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002056 return options, args
2057
2058
2059def main(args):
2060 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002061 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002062
2063
2064if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002065 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002066 fix_encoding.fix_encoding()
2067 tools.disable_buffering()
2068 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002069 sys.exit(main(sys.argv[1:]))