blob: 2e722e6f2ebbfc1ced8d58f45f12e2357f4553d9 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04008__version__ = '0.8.1'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
tanselle4288c32016-07-28 09:45:40 -070029from libs import arfile
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070036from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000037from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000038from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080040import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040041import isolated_format
aludwin81178302016-11-30 17:18:49 -080042import isolate_storage
43from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# The file size to be used when we don't know the correct file size,
51# generally used for .isolated files.
52UNKNOWN_FILE_SIZE = None
53
54
55# Maximum expected delay (in seconds) between successive file fetches or uploads
56# in Storage. If it takes longer than that, a deadlock might be happening
57# and all stack frames for all threads are dumped to log.
58DEADLOCK_TIMEOUT = 5 * 60
59
60
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000061# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000062# All files are sorted by likelihood of a change in the file content
63# (currently file size is used to estimate this: larger the file -> larger the
64# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000065# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000066# and so on. Numbers here is a trade-off; the more per request, the lower the
67# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
68# larger values cause longer lookups, increasing the initial latency to start
69# uploading, which is especially an issue for large files. This value is
70# optimized for the "few thousands files to look up with minimal number of large
71# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040072ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000073
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000074
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075# A list of already compressed extension types that should not receive any
76# compression before being uploaded.
77ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040078 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
79 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000080]
81
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000082
maruel@chromium.org41601642013-09-18 19:40:46 +000083# The delay (in seconds) to wait between logging statements when retrieving
84# the required files. This is intended to let the user (or buildbot) know that
85# the program is still running.
86DELAY_BETWEEN_UPDATES_IN_SECS = 30
87
88
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050089DEFAULT_BLACKLIST = (
90 # Temporary vim or python files.
91 r'^.+\.(?:pyc|swp)$',
92 # .git or .svn directory.
93 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
94)
95
96
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050097class Error(Exception):
98 """Generic runtime error."""
99 pass
100
101
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400102class Aborted(Error):
103 """Operation aborted."""
104 pass
105
106
nodir90bc8dc2016-06-15 13:35:21 -0700107class AlreadyExists(Error):
108 """File already exists."""
109
110
maruel12e30012015-10-09 11:55:35 -0700111def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800112 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700113 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800114 if offset:
115 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000116 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000117 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000118 if not data:
119 break
120 yield data
121
122
maruel12e30012015-10-09 11:55:35 -0700123def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000124 """Writes file content as generated by content_generator.
125
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000126 Creates the intermediary directory as needed.
127
128 Returns the number of bytes written.
129
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000130 Meant to be mocked out in unit tests.
131 """
nodire5028a92016-04-29 14:38:21 -0700132 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 total = 0
maruel12e30012015-10-09 11:55:35 -0700134 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000135 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000136 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000138 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000139
140
tansell9e04a8d2016-07-28 09:31:59 -0700141def fileobj_path(fileobj):
142 """Return file system path for file like object or None.
143
144 The returned path is guaranteed to exist and can be passed to file system
145 operations like copy.
146 """
147 name = getattr(fileobj, 'name', None)
148 if name is None:
149 return
150
151 # If the file like object was created using something like open("test.txt")
152 # name will end up being a str (such as a function outside our control, like
153 # the standard library). We want all our paths to be unicode objects, so we
154 # decode it.
155 if not isinstance(name, unicode):
156 name = name.decode(sys.getfilesystemencoding())
157
tansell26de79e2016-11-13 18:41:11 -0800158 # fs.exists requires an absolute path, otherwise it will fail with an
159 # assertion error.
160 if not os.path.isabs(name):
161 return
162
tansell9e04a8d2016-07-28 09:31:59 -0700163 if fs.exists(name):
164 return name
165
166
167# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
168# wrappers have been created.
169def fileobj_copy(
170 dstfileobj, srcfileobj, size=-1,
171 chunk_size=isolated_format.DISK_FILE_CHUNK):
172 """Copy data from srcfileobj to dstfileobj.
173
174 Providing size means exactly that amount of data will be copied (if there
175 isn't enough data, an IOError exception is thrown). Otherwise all data until
176 the EOF marker will be copied.
177 """
178 if size == -1 and hasattr(srcfileobj, 'tell'):
179 if srcfileobj.tell() != 0:
180 raise IOError('partial file but not using size')
181
182 written = 0
183 while written != size:
184 readsize = chunk_size
185 if size > 0:
186 readsize = min(readsize, size-written)
187 data = srcfileobj.read(readsize)
188 if not data:
189 if size == -1:
190 break
191 raise IOError('partial file, got %s, wanted %s' % (written, size))
192 dstfileobj.write(data)
193 written += len(data)
194
195
196def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
197 """Put srcfileobj at the given dstpath with given mode.
198
199 The function aims to do this as efficiently as possible while still allowing
200 any possible file like object be given.
201
202 Creating a tree of hardlinks has a few drawbacks:
203 - tmpfs cannot be used for the scratch space. The tree has to be on the same
204 partition as the cache.
205 - involves a write to the inode, which advances ctime, cause a metadata
206 writeback (causing disk seeking).
207 - cache ctime cannot be used to detect modifications / corruption.
208 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
209 partition. This is why the function automatically fallbacks to copying the
210 file content.
211 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
212 same owner is for all hardlinks.
213 - Anecdotal report that ext2 is known to be potentially faulty on high rate
214 of hardlink creation.
215
216 Creating a tree of symlinks has a few drawbacks:
217 - Tasks running the equivalent of os.path.realpath() will get the naked path
218 and may fail.
219 - Windows:
220 - Symlinks are reparse points:
221 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
222 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
223 - Symbolic links are Win32 paths, not NT paths.
224 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
225 - Symbolic links are supported on Windows 7 and later only.
226 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
227 default.
228 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
229 RID is present in the token;
230 https://msdn.microsoft.com/en-us/library/bb530410.aspx
231 """
232 srcpath = fileobj_path(srcfileobj)
233 if srcpath and size == -1:
234 readonly = file_mode is None or (
235 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
236
237 if readonly:
238 # If the file is read only we can link the file
239 if use_symlink:
240 link_mode = file_path.SYMLINK_WITH_FALLBACK
241 else:
242 link_mode = file_path.HARDLINK_WITH_FALLBACK
243 else:
244 # If not read only, we must copy the file
245 link_mode = file_path.COPY
246
247 file_path.link_file(dstpath, srcpath, link_mode)
248 else:
249 # Need to write out the file
250 with fs.open(dstpath, 'wb') as dstfileobj:
251 fileobj_copy(dstfileobj, srcfileobj, size)
252
253 assert fs.exists(dstpath)
254
255 # file_mode of 0 is actually valid, so need explicit check.
256 if file_mode is not None:
257 fs.chmod(dstpath, file_mode)
258
259
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000260def zip_compress(content_generator, level=7):
261 """Reads chunks from |content_generator| and yields zip compressed chunks."""
262 compressor = zlib.compressobj(level)
263 for chunk in content_generator:
264 compressed = compressor.compress(chunk)
265 if compressed:
266 yield compressed
267 tail = compressor.flush(zlib.Z_FINISH)
268 if tail:
269 yield tail
270
271
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400272def zip_decompress(
273 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000274 """Reads zipped data from |content_generator| and yields decompressed data.
275
276 Decompresses data in small chunks (no larger than |chunk_size|) so that
277 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
278
279 Raises IOError if data is corrupted or incomplete.
280 """
281 decompressor = zlib.decompressobj()
282 compressed_size = 0
283 try:
284 for chunk in content_generator:
285 compressed_size += len(chunk)
286 data = decompressor.decompress(chunk, chunk_size)
287 if data:
288 yield data
289 while decompressor.unconsumed_tail:
290 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
291 if data:
292 yield data
293 tail = decompressor.flush()
294 if tail:
295 yield tail
296 except zlib.error as e:
297 raise IOError(
298 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
299 # Ensure all data was read and decompressed.
300 if decompressor.unused_data or decompressor.unconsumed_tail:
301 raise IOError('Not all data was decompressed')
302
303
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000304def get_zip_compression_level(filename):
305 """Given a filename calculates the ideal zip compression level to use."""
306 file_ext = os.path.splitext(filename)[1].lower()
307 # TODO(csharp): Profile to find what compression level works best.
308 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
309
310
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000311def create_directories(base_directory, files):
312 """Creates the directory structure needed by the given list of files."""
313 logging.debug('create_directories(%s, %d)', base_directory, len(files))
314 # Creates the tree of directories to create.
315 directories = set(os.path.dirname(f) for f in files)
316 for item in list(directories):
317 while item:
318 directories.add(item)
319 item = os.path.dirname(item)
320 for d in sorted(directories):
321 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700322 abs_d = os.path.join(base_directory, d)
323 if not fs.isdir(abs_d):
324 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000325
326
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500327def create_symlinks(base_directory, files):
328 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000329 for filepath, properties in files:
330 if 'l' not in properties:
331 continue
332 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500333 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000334 logging.warning('Ignoring symlink %s', filepath)
335 continue
336 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700337 try:
338 os.symlink(properties['l'], outfile) # pylint: disable=E1101
339 except OSError as e:
340 if e.errno == errno.EEXIST:
341 raise AlreadyExists('File %s already exists.' % outfile)
342 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000343
344
maruel12e30012015-10-09 11:55:35 -0700345def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000346 """Determines if the given files appears valid.
347
vadimsh129e5942017-01-04 16:42:46 -0800348 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000349 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700350 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700351 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800352 try:
353 actual_size = fs.stat(path).st_size
354 except OSError as e:
355 logging.warning(
356 'Can\'t read item %s, assuming it\'s invalid: %s',
357 os.path.basename(path), e)
358 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000359 if size != actual_size:
360 logging.warning(
361 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700362 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000363 return False
364 return True
365
366
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000367class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800368 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000369
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 Its digest and size may be provided in advance, if known. Otherwise they will
371 be derived from the file content.
372 """
373
374 def __init__(self, path, digest=None, size=None, high_priority=False):
375 super(FileItem, self).__init__(
376 digest,
maruel12e30012015-10-09 11:55:35 -0700377 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800378 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000379 self.path = path
380 self.compression_level = get_zip_compression_level(path)
381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000386class BufferItem(Item):
387 """A byte buffer to push to Storage."""
388
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800389 def __init__(self, buf, high_priority=False):
390 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000391 self.buffer = buf
392
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800393 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000394 return [self.buffer]
395
396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800398 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000399
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800400 Implements compression support, parallel 'contains' checks, parallel uploads
401 and more.
402
403 Works only within single namespace (and thus hashing algorithm and compression
404 scheme are fixed).
405
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400406 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
407 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800408 """
409
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700410 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000411 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400412 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700413 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400414 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 self._cpu_thread_pool = None
416 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 self._aborted = False
418 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000419
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700421 def hash_algo(self):
422 """Hashing algorithm used to name files in storage based on their content.
423
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400424 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700425 """
426 return self._hash_algo
427
428 @property
429 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500430 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700431 return self._storage_api.location
432
433 @property
434 def namespace(self):
435 """Isolate namespace used by this storage.
436
437 Indirectly defines hashing scheme and compression method used.
438 """
439 return self._storage_api.namespace
440
441 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000442 def cpu_thread_pool(self):
443 """ThreadPool for CPU-bound tasks like zipping."""
444 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500445 threads = max(threading_utils.num_processors(), 2)
446 if sys.maxsize <= 2L**32:
447 # On 32 bits userland, do not try to use more than 16 threads.
448 threads = min(threads, 16)
449 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000450 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 @property
453 def net_thread_pool(self):
454 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
455 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700456 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 def close(self):
460 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400461 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 if self._cpu_thread_pool:
463 self._cpu_thread_pool.join()
464 self._cpu_thread_pool.close()
465 self._cpu_thread_pool = None
466 if self._net_thread_pool:
467 self._net_thread_pool.join()
468 self._net_thread_pool.close()
469 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400470 logging.info('Done.')
471
472 def abort(self):
473 """Cancels any pending or future operations."""
474 # This is not strictly theadsafe, but in the worst case the logging message
475 # will be printed twice. Not a big deal. In other places it is assumed that
476 # unprotected reads and writes to _aborted are serializable (it is true
477 # for python) and thus no locking is used.
478 if not self._aborted:
479 logging.warning('Aborting... It can take a while.')
480 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000481
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 def __enter__(self):
483 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400484 assert not self._prev_sig_handlers, self._prev_sig_handlers
485 for s in (signal.SIGINT, signal.SIGTERM):
486 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 return self
488
489 def __exit__(self, _exc_type, _exc_value, _traceback):
490 """Context manager interface."""
491 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400492 while self._prev_sig_handlers:
493 s, h = self._prev_sig_handlers.popitem()
494 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 return False
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
503 items: list of Item instances that represents data to upload.
504
505 Returns:
506 List of items that were uploaded. All other items are already there.
507 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700508 logging.info('upload_items(items=%d)', len(items))
509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 # Ensure all digests are calculated.
511 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700512 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000514 # For each digest keep only first Item that matches it. All other items
515 # are just indistinguishable copies from the point of view of isolate
516 # server (it doesn't care about paths at all, only content and digests).
517 seen = {}
518 duplicates = 0
519 for item in items:
520 if seen.setdefault(item.digest, item) is not item:
521 duplicates += 1
522 items = seen.values()
523 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700524 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000525
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 channel = threading_utils.TaskChannel()
530 for missing_item, push_state in self.get_missing_items(items):
531 missing.add(missing_item)
532 self.async_push(channel, missing_item, push_state)
533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 # No need to spawn deadlock detector thread if there's nothing to upload.
535 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700536 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 detector.ping()
540 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 logging.info('All files are uploaded')
545
546 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 total = len(items)
548 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 logging.info(
550 'Total: %6d, %9.1fkb',
551 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000552 total_size / 1024.)
553 cache_hit = set(items) - missing
554 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 logging.info(
556 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
557 len(cache_hit),
558 cache_hit_size / 1024.,
559 len(cache_hit) * 100. / total,
560 cache_hit_size * 100. / total_size if total_size else 0)
561 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 logging.info(
564 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
565 len(cache_miss),
566 cache_miss_size / 1024.,
567 len(cache_miss) * 100. / total,
568 cache_miss_size * 100. / total_size if total_size else 0)
569
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 return uploaded
571
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000573 """Starts asynchronous push to the server in a parallel thread.
574
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 Can be used only after |item| was checked for presence on a server with
576 'get_missing_items' call. 'get_missing_items' returns |push_state| object
577 that contains storage specific information describing how to upload
578 the item (for example in case of cloud storage, it is signed upload URLs).
579
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000581 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000582 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800583 push_state: push state returned by 'get_missing_items' call for |item|.
584
585 Returns:
586 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800588 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400589 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700590 threading_utils.PRIORITY_HIGH if item.high_priority
591 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800592
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000593 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400594 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400595 if self._aborted:
596 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800598 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 return item
600
601 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700602 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800603 self.net_thread_pool.add_task_with_channel(
604 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 return
606
607 # If zipping is enabled, zip in a separate thread.
608 def zip_and_push():
609 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
610 # content right here. It will block until all file is zipped.
611 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400612 if self._aborted:
613 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 data = ''.join(stream)
616 except Exception as exc:
617 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800618 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000620 self.net_thread_pool.add_task_with_channel(
621 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000622 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624 def push(self, item, push_state):
625 """Synchronously pushes a single item to the server.
626
627 If you need to push many items at once, consider using 'upload_items' or
628 'async_push' with instance of TaskChannel.
629
630 Arguments:
631 item: item to upload as instance of Item class.
632 push_state: push state returned by 'get_missing_items' call for |item|.
633
634 Returns:
635 Pushed item (same object as |item|).
636 """
637 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700638 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639 self.async_push(channel, item, push_state)
640 pushed = channel.pull()
641 assert pushed is item
642 return item
643
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000644 def async_fetch(self, channel, priority, digest, size, sink):
645 """Starts asynchronous fetch from the server in a parallel thread.
646
647 Arguments:
648 channel: TaskChannel that receives back |digest| when download ends.
649 priority: thread pool task priority for the fetch.
650 digest: hex digest of an item to download.
651 size: expected size of the item (after decompression).
652 sink: function that will be called as sink(generator).
653 """
654 def fetch():
655 try:
656 # Prepare reading pipeline.
657 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700658 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400659 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000660 # Run |stream| through verifier that will assert its size.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400661 verifier = FetchStreamVerifier(stream, self._hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000662 # Verified stream goes to |sink|.
663 sink(verifier.run())
664 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800665 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000666 raise
667 return digest
668
669 # Don't bother with zip_thread_pool for decompression. Decompression is
670 # really fast and most probably IO bound anyway.
671 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
672
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000673 def get_missing_items(self, items):
674 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000675
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000676 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000677
678 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000679 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800682 For each missing item it yields a pair (item, push_state), where:
683 * item - Item object that is missing (one of |items|).
684 * push_state - opaque object that contains storage specific information
685 describing how to upload the item (for example in case of cloud
686 storage, it is signed upload URLs). It can later be passed to
687 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000688 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000689 channel = threading_utils.TaskChannel()
690 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800691
692 # Ensure all digests are calculated.
693 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700694 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800695
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400696 def contains(batch):
697 if self._aborted:
698 raise Aborted()
699 return self._storage_api.contains(batch)
700
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000701 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800702 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400703 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400704 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000705 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800706
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000707 # Yield results as they come in.
708 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800709 for missing_item, push_state in channel.pull().iteritems():
710 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000711
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000712
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800713def batch_items_for_check(items):
714 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000715
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800716 Each batch corresponds to a single 'exists?' query to the server via a call
717 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000718
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800719 Arguments:
720 items: a list of Item objects.
721
722 Yields:
723 Batches of items to query for existence in a single operation,
724 each batch is a list of Item objects.
725 """
726 batch_count = 0
727 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
728 next_queries = []
729 for item in sorted(items, key=lambda x: x.size, reverse=True):
730 next_queries.append(item)
731 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000732 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800733 next_queries = []
734 batch_count += 1
735 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
736 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
737 if next_queries:
738 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000739
740
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000741class FetchQueue(object):
742 """Fetches items from Storage and places them into LocalCache.
743
744 It manages multiple concurrent fetch operations. Acts as a bridge between
745 Storage and LocalCache so that Storage and LocalCache don't depend on each
746 other at all.
747 """
748
749 def __init__(self, storage, cache):
750 self.storage = storage
751 self.cache = cache
752 self._channel = threading_utils.TaskChannel()
753 self._pending = set()
754 self._accessed = set()
755 self._fetched = cache.cached_set()
756
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400757 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700758 self,
759 digest,
760 size=UNKNOWN_FILE_SIZE,
761 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000762 """Starts asynchronous fetch of item |digest|."""
763 # Fetching it now?
764 if digest in self._pending:
765 return
766
767 # Mark this file as in use, verify_all_cached will later ensure it is still
768 # in cache.
769 self._accessed.add(digest)
770
771 # Already fetched? Notify cache to update item's LRU position.
772 if digest in self._fetched:
773 # 'touch' returns True if item is in cache and not corrupted.
774 if self.cache.touch(digest, size):
775 return
776 # Item is corrupted, remove it from cache and fetch it again.
777 self._fetched.remove(digest)
778 self.cache.evict(digest)
779
780 # TODO(maruel): It should look at the free disk space, the current cache
781 # size and the size of the new item on every new item:
782 # - Trim the cache as more entries are listed when free disk space is low,
783 # otherwise if the amount of data downloaded during the run > free disk
784 # space, it'll crash.
785 # - Make sure there's enough free disk space to fit all dependencies of
786 # this run! If not, abort early.
787
788 # Start fetching.
789 self._pending.add(digest)
790 self.storage.async_fetch(
791 self._channel, priority, digest, size,
792 functools.partial(self.cache.write, digest))
793
794 def wait(self, digests):
795 """Starts a loop that waits for at least one of |digests| to be retrieved.
796
797 Returns the first digest retrieved.
798 """
799 # Flush any already fetched items.
800 for digest in digests:
801 if digest in self._fetched:
802 return digest
803
804 # Ensure all requested items are being fetched now.
805 assert all(digest in self._pending for digest in digests), (
806 digests, self._pending)
807
808 # Wait for some requested item to finish fetching.
809 while self._pending:
810 digest = self._channel.pull()
811 self._pending.remove(digest)
812 self._fetched.add(digest)
813 if digest in digests:
814 return digest
815
816 # Should never reach this point due to assert above.
817 raise RuntimeError('Impossible state')
818
819 def inject_local_file(self, path, algo):
820 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700821 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 data = f.read()
823 digest = algo(data).hexdigest()
824 self.cache.write(digest, [data])
825 self._fetched.add(digest)
826 return digest
827
828 @property
829 def pending_count(self):
830 """Returns number of items to be fetched."""
831 return len(self._pending)
832
833 def verify_all_cached(self):
834 """True if all accessed items are in cache."""
835 return self._accessed.issubset(self.cache.cached_set())
836
837
838class FetchStreamVerifier(object):
839 """Verifies that fetched file is valid before passing it to the LocalCache."""
840
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400841 def __init__(self, stream, hasher, expected_digest, expected_size):
842 """Initializes the verifier.
843
844 Arguments:
845 * stream: an iterable yielding chunks of content
846 * hasher: an object from hashlib that supports update() and hexdigest()
847 (eg, hashlib.sha1).
848 * expected_digest: if the entire stream is piped through hasher and then
849 summarized via hexdigest(), this should be the result. That is, it
850 should be a hex string like 'abc123'.
851 * expected_size: either the expected size of the stream, or
852 UNKNOWN_FILE_SIZE.
853 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400854 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000855 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400856 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000857 self.expected_size = expected_size
858 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400859 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000860
861 def run(self):
862 """Generator that yields same items as |stream|.
863
864 Verifies |stream| is complete before yielding a last chunk to consumer.
865
866 Also wraps IOError produced by consumer into MappingError exceptions since
867 otherwise Storage will retry fetch on unrelated local cache errors.
868 """
869 # Read one chunk ahead, keep it in |stored|.
870 # That way a complete stream can be verified before pushing last chunk
871 # to consumer.
872 stored = None
873 for chunk in self.stream:
874 assert chunk is not None
875 if stored is not None:
876 self._inspect_chunk(stored, is_last=False)
877 try:
878 yield stored
879 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400880 raise isolated_format.MappingError(
881 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000882 stored = chunk
883 if stored is not None:
884 self._inspect_chunk(stored, is_last=True)
885 try:
886 yield stored
887 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400888 raise isolated_format.MappingError(
889 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000890
891 def _inspect_chunk(self, chunk, is_last):
892 """Called for each fetched chunk before passing it to consumer."""
893 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400894 self.rolling_hash.update(chunk)
895 if not is_last:
896 return
897
898 if ((self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000899 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400900 msg = 'Incorrect file size: want %d, got %d' % (
901 self.expected_size, self.current_size)
902 logging.error('%s; last chunk:\n%s', msg, chunk)
903 raise IOError(msg)
904
905 actual_digest = self.rolling_hash.hexdigest()
906 if self.expected_digest != actual_digest:
907 msg = 'Incorrect digest: want %s, got %s' % (
908 self.expected_digest, actual_digest)
909 logging.error('%s; last chunk:\n%s', msg, chunk)
910 # TODO(aludwin): actually raise an error. In the short term, we'll
911 # continue to let this through to verify that we see the logs when we
912 # expect to; if we just return IOError, the download will be retried and
913 # the error will be masked.
914 # raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000915
916
nodir445097b2016-06-03 22:50:26 -0700917class CacheMiss(Exception):
918 """Raised when an item is not in cache."""
919
920 def __init__(self, digest):
921 self.digest = digest
922 super(CacheMiss, self).__init__(
923 'Item with digest %r is not found in cache' % digest)
924
925
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000926class LocalCache(object):
927 """Local cache that stores objects fetched via Storage.
928
929 It can be accessed concurrently from multiple threads, so it should protect
930 its internal state with some lock.
931 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500932 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000933
maruel064c0a32016-04-05 11:47:15 -0700934 def __init__(self):
935 self._lock = threading_utils.LockWithAssert()
936 # Profiling values.
937 self._added = []
938 self._initial_number_items = 0
939 self._initial_size = 0
940 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700941 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700942
nodirbe642ff2016-06-09 15:51:51 -0700943 def __contains__(self, digest):
944 raise NotImplementedError()
945
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000946 def __enter__(self):
947 """Context manager interface."""
948 return self
949
950 def __exit__(self, _exc_type, _exec_value, _traceback):
951 """Context manager interface."""
952 return False
953
maruel064c0a32016-04-05 11:47:15 -0700954 @property
955 def added(self):
956 return self._added[:]
957
958 @property
959 def evicted(self):
960 return self._evicted[:]
961
962 @property
tansell9e04a8d2016-07-28 09:31:59 -0700963 def used(self):
964 return self._used[:]
965
966 @property
maruel064c0a32016-04-05 11:47:15 -0700967 def initial_number_items(self):
968 return self._initial_number_items
969
970 @property
971 def initial_size(self):
972 return self._initial_size
973
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000974 def cached_set(self):
975 """Returns a set of all cached digests (always a new object)."""
976 raise NotImplementedError()
977
maruel36a963d2016-04-08 17:15:49 -0700978 def cleanup(self):
979 """Deletes any corrupted item from the cache and trims it if necessary."""
980 raise NotImplementedError()
981
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000982 def touch(self, digest, size):
983 """Ensures item is not corrupted and updates its LRU position.
984
985 Arguments:
986 digest: hash digest of item to check.
987 size: expected size of this item.
988
989 Returns:
990 True if item is in cache and not corrupted.
991 """
992 raise NotImplementedError()
993
994 def evict(self, digest):
995 """Removes item from cache if it's there."""
996 raise NotImplementedError()
997
tansell9e04a8d2016-07-28 09:31:59 -0700998 def getfileobj(self, digest):
999 """Returns a readable file like object.
1000
1001 If file exists on the file system it will have a .name attribute with an
1002 absolute path to the file.
1003 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001004 raise NotImplementedError()
1005
1006 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001007 """Reads data from |content| generator and stores it in cache.
1008
1009 Returns digest to simplify chaining.
1010 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001011 raise NotImplementedError()
1012
maruele6fc9382017-05-04 09:03:48 -07001013 def trim(self):
1014 """Enforces cache policies.
1015
1016 Returns:
1017 Number of items evicted.
1018 """
1019 raise NotImplementedError()
1020
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001021
1022class MemoryCache(LocalCache):
1023 """LocalCache implementation that stores everything in memory."""
1024
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001025 def __init__(self, file_mode_mask=0500):
1026 """Args:
1027 file_mode_mask: bit mask to AND file mode with. Default value will make
1028 all mapped files to be read only.
1029 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001030 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001031 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001032 self._contents = {}
1033
nodirbe642ff2016-06-09 15:51:51 -07001034 def __contains__(self, digest):
1035 with self._lock:
1036 return digest in self._contents
1037
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001038 def cached_set(self):
1039 with self._lock:
1040 return set(self._contents)
1041
maruel36a963d2016-04-08 17:15:49 -07001042 def cleanup(self):
1043 pass
1044
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001045 def touch(self, digest, size):
1046 with self._lock:
1047 return digest in self._contents
1048
1049 def evict(self, digest):
1050 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001051 v = self._contents.pop(digest, None)
1052 if v is not None:
1053 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001054
tansell9e04a8d2016-07-28 09:31:59 -07001055 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001056 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001057 try:
tansell9e04a8d2016-07-28 09:31:59 -07001058 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001059 except KeyError:
1060 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001061 self._used.append(len(d))
1062 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001063
1064 def write(self, digest, content):
1065 # Assemble whole stream before taking the lock.
1066 data = ''.join(content)
1067 with self._lock:
1068 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001069 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001070 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001071
maruele6fc9382017-05-04 09:03:48 -07001072 def trim(self):
1073 """Trimming is not implemented for MemoryCache."""
1074 return 0
1075
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001076
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001077class CachePolicies(object):
1078 def __init__(self, max_cache_size, min_free_space, max_items):
1079 """
1080 Arguments:
1081 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1082 cache is effectively a leak.
1083 - min_free_space: Trim if disk free space becomes lower than this value. If
1084 0, it unconditionally fill the disk.
1085 - max_items: Maximum number of items to keep in the cache. If 0, do not
1086 enforce a limit.
1087 """
1088 self.max_cache_size = max_cache_size
1089 self.min_free_space = min_free_space
1090 self.max_items = max_items
1091
1092
1093class DiskCache(LocalCache):
1094 """Stateful LRU cache in a flat hash table in a directory.
1095
1096 Saves its state as json file.
1097 """
maruel12e30012015-10-09 11:55:35 -07001098 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001099
maruele6fc9382017-05-04 09:03:48 -07001100 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001101 """
1102 Arguments:
1103 cache_dir: directory where to place the cache.
1104 policies: cache retention policies.
1105 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001106 trim: if True to enforce |policies| right away.
1107 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001108 """
maruel064c0a32016-04-05 11:47:15 -07001109 # All protected methods (starting with '_') except _path should be called
1110 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001111 super(DiskCache, self).__init__()
1112 self.cache_dir = cache_dir
1113 self.policies = policies
1114 self.hash_algo = hash_algo
1115 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001116 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001117 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001118 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001119 file_path.ensure_tree(self.cache_dir)
1120 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001121 # The first item in the LRU cache that must not be evicted during this run
1122 # since it was referenced. All items more recent that _protected in the LRU
1123 # cache are also inherently protected. It could be a set() of all items
1124 # referenced but this increases memory usage without a use case.
1125 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001126 # Cleanup operations done by self._load(), if any.
1127 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001128 with tools.Profiler('Setup'):
1129 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001130 self._load(trim, time_fn)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001131
nodirbe642ff2016-06-09 15:51:51 -07001132 def __contains__(self, digest):
1133 with self._lock:
1134 return digest in self._lru
1135
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001136 def __enter__(self):
1137 return self
1138
1139 def __exit__(self, _exc_type, _exec_value, _traceback):
1140 with tools.Profiler('CleanupTrimming'):
1141 with self._lock:
1142 self._trim()
1143
1144 logging.info(
1145 '%5d (%8dkb) added',
1146 len(self._added), sum(self._added) / 1024)
1147 logging.info(
1148 '%5d (%8dkb) current',
1149 len(self._lru),
1150 sum(self._lru.itervalues()) / 1024)
1151 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001152 '%5d (%8dkb) evicted',
1153 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001154 logging.info(
1155 ' %8dkb free',
1156 self._free_disk / 1024)
1157 return False
1158
1159 def cached_set(self):
1160 with self._lock:
1161 return self._lru.keys_set()
1162
maruel36a963d2016-04-08 17:15:49 -07001163 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001164 """Cleans up the cache directory.
1165
1166 Ensures there is no unknown files in cache_dir.
1167 Ensures the read-only bits are set correctly.
1168
1169 At that point, the cache was already loaded, trimmed to respect cache
1170 policies.
1171 """
1172 fs.chmod(self.cache_dir, 0700)
1173 # Ensure that all files listed in the state still exist and add new ones.
1174 previous = self._lru.keys_set()
1175 # It'd be faster if there were a readdir() function.
1176 for filename in fs.listdir(self.cache_dir):
1177 if filename == self.STATE_FILE:
1178 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1179 continue
1180 if filename in previous:
1181 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1182 previous.remove(filename)
1183 continue
1184
1185 # An untracked file. Delete it.
1186 logging.warning('Removing unknown file %s from cache', filename)
1187 p = self._path(filename)
1188 if fs.isdir(p):
1189 try:
1190 file_path.rmtree(p)
1191 except OSError:
1192 pass
1193 else:
1194 file_path.try_remove(p)
1195 continue
1196
1197 if previous:
1198 # Filter out entries that were not found.
1199 logging.warning('Removed %d lost files', len(previous))
1200 for filename in previous:
1201 self._lru.pop(filename)
maruele6fc9382017-05-04 09:03:48 -07001202 self._save()
maruel36a963d2016-04-08 17:15:49 -07001203
1204 # What remains to be done is to hash every single item to
1205 # detect corruption, then save to ensure state.json is up to date.
1206 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1207 # TODO(maruel): Let's revisit once directory metadata is stored in
1208 # state.json so only the files that had been mapped since the last cleanup()
1209 # call are manually verified.
1210 #
1211 #with self._lock:
1212 # for digest in self._lru:
1213 # if not isolated_format.is_valid_hash(
1214 # self._path(digest), self.hash_algo):
1215 # self.evict(digest)
1216 # logging.info('Deleted corrupted item: %s', digest)
1217
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001218 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001219 """Verifies an actual file is valid and bumps its LRU position.
1220
1221 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1222 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001223
1224 Note that is doesn't compute the hash so it could still be corrupted if the
1225 file size didn't change.
1226
1227 TODO(maruel): More stringent verification while keeping the check fast.
1228 """
1229 # Do the check outside the lock.
1230 if not is_valid_file(self._path(digest), size):
1231 return False
1232
1233 # Update it's LRU position.
1234 with self._lock:
1235 if digest not in self._lru:
1236 return False
1237 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001238 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001239 return True
1240
1241 def evict(self, digest):
1242 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001243 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001244 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001245 self._lru.pop(digest)
1246 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1247
tansell9e04a8d2016-07-28 09:31:59 -07001248 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001249 try:
tansell9e04a8d2016-07-28 09:31:59 -07001250 f = fs.open(self._path(digest), 'rb')
1251 with self._lock:
1252 self._used.append(self._lru[digest])
1253 return f
nodir445097b2016-06-03 22:50:26 -07001254 except IOError:
1255 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001256
1257 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001258 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001259 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001260 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001261 path = self._path(digest)
1262 # A stale broken file may remain. It is possible for the file to have write
1263 # access bit removed which would cause the file_write() call to fail to open
1264 # in write mode. Take no chance here.
1265 file_path.try_remove(path)
1266 try:
1267 size = file_write(path, content)
1268 except:
1269 # There are two possible places were an exception can occur:
1270 # 1) Inside |content| generator in case of network or unzipping errors.
1271 # 2) Inside file_write itself in case of disk IO errors.
1272 # In any case delete an incomplete file and propagate the exception to
1273 # caller, it will be logged there.
1274 file_path.try_remove(path)
1275 raise
1276 # Make the file read-only in the cache. This has a few side-effects since
1277 # the file node is modified, so every directory entries to this file becomes
1278 # read-only. It's fine here because it is a new file.
1279 file_path.set_read_only(path, True)
1280 with self._lock:
1281 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001282 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001283
nodirf33b8d62016-10-26 22:34:58 -07001284 def get_oldest(self):
1285 """Returns digest of the LRU item or None."""
1286 try:
1287 return self._lru.get_oldest()[0]
1288 except KeyError:
1289 return None
1290
1291 def get_timestamp(self, digest):
1292 """Returns timestamp of last use of an item.
1293
1294 Raises KeyError if item is not found.
1295 """
1296 return self._lru.get_timestamp(digest)
1297
1298 def trim(self):
1299 """Forces retention policies."""
1300 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001301 return self._trim()
nodirf33b8d62016-10-26 22:34:58 -07001302
maruele6fc9382017-05-04 09:03:48 -07001303 def _load(self, trim, time_fn):
maruel2e8d0f52016-07-16 07:51:29 -07001304 """Loads state of the cache from json file.
1305
1306 If cache_dir does not exist on disk, it is created.
1307 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001308 self._lock.assert_locked()
1309
maruel2e8d0f52016-07-16 07:51:29 -07001310 if not fs.isfile(self.state_file):
1311 if not os.path.isdir(self.cache_dir):
1312 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001313 else:
maruel2e8d0f52016-07-16 07:51:29 -07001314 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001315 try:
1316 self._lru = lru.LRUDict.load(self.state_file)
1317 except ValueError as err:
1318 logging.error('Failed to load cache state: %s' % (err,))
1319 # Don't want to keep broken state file.
1320 file_path.try_remove(self.state_file)
maruele6fc9382017-05-04 09:03:48 -07001321 if time_fn:
1322 self._lru.time_fn = time_fn
nodirf33b8d62016-10-26 22:34:58 -07001323 if trim:
1324 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001325 # We want the initial cache size after trimming, i.e. what is readily
1326 # avaiable.
1327 self._initial_number_items = len(self._lru)
1328 self._initial_size = sum(self._lru.itervalues())
1329 if self._evicted:
1330 logging.info(
1331 'Trimming evicted items with the following sizes: %s',
1332 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001333
1334 def _save(self):
1335 """Saves the LRU ordering."""
1336 self._lock.assert_locked()
1337 if sys.platform != 'win32':
1338 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001339 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001340 # Necessary otherwise the file can't be created.
1341 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001342 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001343 file_path.set_read_only(self.state_file, False)
1344 self._lru.save(self.state_file)
1345
1346 def _trim(self):
1347 """Trims anything we don't know, make sure enough free space exists."""
1348 self._lock.assert_locked()
1349
1350 # Ensure maximum cache size.
1351 if self.policies.max_cache_size:
1352 total_size = sum(self._lru.itervalues())
1353 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001354 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001355
1356 # Ensure maximum number of items in the cache.
1357 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1358 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001359 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001360
1361 # Ensure enough free space.
1362 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001363 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001364 while (
1365 self.policies.min_free_space and
1366 self._lru and
1367 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001368 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001369 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001370
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001371 if trimmed_due_to_space:
1372 total_usage = sum(self._lru.itervalues())
1373 usage_percent = 0.
1374 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001375 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1376
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001377 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001378 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1379 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1380 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001381 self._free_disk / 1024.,
1382 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001383 usage_percent,
1384 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001385 self._save()
maruele6fc9382017-05-04 09:03:48 -07001386 return trimmed_due_to_space
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001387
1388 def _path(self, digest):
1389 """Returns the path to one item."""
1390 return os.path.join(self.cache_dir, digest)
1391
maruel2e8d0f52016-07-16 07:51:29 -07001392 def _remove_lru_file(self, allow_protected):
1393 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001394 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001395 try:
nodireabc11c2016-10-18 16:37:28 -07001396 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001397 if not allow_protected and digest == self._protected:
maruele6fc9382017-05-04 09:03:48 -07001398 raise Error(
1399 'Not enough space to fetch the whole isolated tree; %sb free, min '
1400 'is %sb' % (self._free_disk, self.policies.min_free_space))
maruel083fa552016-04-08 14:38:01 -07001401 except KeyError:
1402 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001403 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001404 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001405 self._delete_file(digest, size)
1406 return size
1407
1408 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1409 """Adds an item into LRU cache marking it as a newest one."""
1410 self._lock.assert_locked()
1411 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001412 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001413 self._added.append(size)
1414 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001415 self._free_disk -= size
1416 # Do a quicker version of self._trim(). It only enforces free disk space,
1417 # not cache size limits. It doesn't actually look at real free disk space,
1418 # only uses its cache values. self._trim() will be called later to enforce
1419 # real trimming but doing this quick version here makes it possible to map
1420 # an isolated that is larger than the current amount of free disk space when
1421 # the cache size is already large.
1422 while (
1423 self.policies.min_free_space and
1424 self._lru and
1425 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001426 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001427
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001428 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1429 """Deletes cache file from the file system."""
1430 self._lock.assert_locked()
1431 try:
1432 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001433 try:
1434 size = fs.stat(self._path(digest)).st_size
1435 except OSError:
1436 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001437 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001438 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001439 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001440 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001441 if e.errno != errno.ENOENT:
1442 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001443
1444
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001445class IsolatedBundle(object):
1446 """Fetched and parsed .isolated file with all dependencies."""
1447
Vadim Shtayura3148e072014-09-02 18:51:52 -07001448 def __init__(self):
1449 self.command = []
1450 self.files = {}
1451 self.read_only = None
1452 self.relative_cwd = None
1453 # The main .isolated file, a IsolatedFile instance.
1454 self.root = None
1455
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001456 def fetch(self, fetch_queue, root_isolated_hash, algo):
1457 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001458
1459 It enables support for "included" .isolated files. They are processed in
1460 strict order but fetched asynchronously from the cache. This is important so
1461 that a file in an included .isolated file that is overridden by an embedding
1462 .isolated file is not fetched needlessly. The includes are fetched in one
1463 pass and the files are fetched as soon as all the ones on the left-side
1464 of the tree were fetched.
1465
1466 The prioritization is very important here for nested .isolated files.
1467 'includes' have the highest priority and the algorithm is optimized for both
1468 deep and wide trees. A deep one is a long link of .isolated files referenced
1469 one at a time by one item in 'includes'. A wide one has a large number of
1470 'includes' in a single .isolated file. 'left' is defined as an included
1471 .isolated file earlier in the 'includes' list. So the order of the elements
1472 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001473
1474 As a side effect this method starts asynchronous fetch of all data files
1475 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1476 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001477 """
1478 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1479
1480 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1481 pending = {}
1482 # Set of hashes of already retrieved items to refuse recursive includes.
1483 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001484 # Set of IsolatedFile's whose data files have already being fetched.
1485 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001486
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001487 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001488 h = isolated_file.obj_hash
1489 if h in seen:
1490 raise isolated_format.IsolatedError(
1491 'IsolatedFile %s is retrieved recursively' % h)
1492 assert h not in pending
1493 seen.add(h)
1494 pending[h] = isolated_file
1495 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1496
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001497 # Start fetching root *.isolated file (single file, not the whole bundle).
1498 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001499
1500 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001501 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001502 item_hash = fetch_queue.wait(pending)
1503 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001504 with fetch_queue.cache.getfileobj(item_hash) as f:
1505 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001506
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001507 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001508 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001509 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001510
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001511 # Always fetch *.isolated files in traversal order, waiting if necessary
1512 # until next to-be-processed node loads. "Waiting" is done by yielding
1513 # back to the outer loop, that waits until some *.isolated is loaded.
1514 for node in isolated_format.walk_includes(self.root):
1515 if node not in processed:
1516 # Not visited, and not yet loaded -> wait for it to load.
1517 if not node.is_loaded:
1518 break
1519 # Not visited and loaded -> process it and continue the traversal.
1520 self._start_fetching_files(node, fetch_queue)
1521 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001522
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001523 # All *.isolated files should be processed by now and only them.
1524 all_isolateds = set(isolated_format.walk_includes(self.root))
1525 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001526
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001527 # Extract 'command' and other bundle properties.
1528 for node in isolated_format.walk_includes(self.root):
1529 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001530 self.relative_cwd = self.relative_cwd or ''
1531
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001532 def _start_fetching_files(self, isolated, fetch_queue):
1533 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001534
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001535 Modifies self.files.
1536 """
maruel10bea7b2016-12-07 05:03:49 -08001537 files = isolated.data.get('files', {})
1538 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1539 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001540 # Root isolated has priority on the files being mapped. In particular,
1541 # overridden files must not be fetched.
1542 if filepath not in self.files:
1543 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001544
1545 # Make sure if the isolated is read only, the mode doesn't have write
1546 # bits.
1547 if 'm' in properties and self.read_only:
1548 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1549
1550 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001551 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001552 fetch_queue.add(
1553 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1554
1555 def _update_self(self, node):
1556 """Extracts bundle global parameters from loaded *.isolated file.
1557
1558 Will be called with each loaded *.isolated file in order of traversal of
1559 isolated include graph (see isolated_format.walk_includes).
1560 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001561 # Grabs properties.
1562 if not self.command and node.data.get('command'):
1563 # Ensure paths are correctly separated on windows.
1564 self.command = node.data['command']
1565 if self.command:
1566 self.command[0] = self.command[0].replace('/', os.path.sep)
1567 self.command = tools.fix_python_path(self.command)
1568 if self.read_only is None and node.data.get('read_only') is not None:
1569 self.read_only = node.data['read_only']
1570 if (self.relative_cwd is None and
1571 node.data.get('relative_cwd') is not None):
1572 self.relative_cwd = node.data['relative_cwd']
1573
1574
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001575def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001576 """Returns Storage class that can upload and download from |namespace|.
1577
1578 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001579 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001580 namespace: isolate namespace to operate in, also defines hashing and
1581 compression scheme used, i.e. namespace names that end with '-gzip'
1582 store compressed data.
1583
1584 Returns:
1585 Instance of Storage.
1586 """
aludwin81178302016-11-30 17:18:49 -08001587 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001588
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001589
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001590def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001591 """Uploads the given tree to the given url.
1592
1593 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001594 base_url: The url of the isolate server to upload to.
1595 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001596 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001597 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001598 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001599 # Filter out symlinks, since they are not represented by items on isolate
1600 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001601 items = []
1602 seen = set()
1603 skipped = 0
1604 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001605 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001606 if 'l' not in metadata and filepath not in seen:
1607 seen.add(filepath)
1608 item = FileItem(
1609 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001610 digest=metadata['h'],
1611 size=metadata['s'],
1612 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001613 items.append(item)
1614 else:
1615 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001616
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001617 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001618 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001619 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001620
1621
maruel4409e302016-07-19 14:25:51 -07001622def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001623 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001624
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001625 Arguments:
1626 isolated_hash: hash of the root *.isolated file.
1627 storage: Storage class that communicates with isolate storage.
1628 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001629 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001630 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001631
1632 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001633 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001634 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001635 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001636 'fetch_isolated(%s, %s, %s, %s, %s)',
1637 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001638 # Hash algorithm to use, defined by namespace |storage| is using.
1639 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001640 with cache:
1641 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001642 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001643
1644 with tools.Profiler('GetIsolateds'):
1645 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001646 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001647 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001648 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001649 try:
maruel1ceb3872015-10-14 06:10:44 -07001650 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001651 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001652 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001653 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1654 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001655
1656 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001657 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001658
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001659 with tools.Profiler('GetRest'):
1660 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001661 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001662 create_directories(outdir, bundle.files)
1663 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001664
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001665 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001666 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001667 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001668
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001669 # Multimap: digest -> list of pairs (path, props).
1670 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001671 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001672 if 'h' in props:
1673 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001674
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001675 # Now block on the remaining files to be downloaded and mapped.
1676 logging.info('Retrieving remaining files (%d of them)...',
1677 fetch_queue.pending_count)
1678 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001679 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001680 while remaining:
1681 detector.ping()
1682
1683 # Wait for any item to finish fetching to cache.
1684 digest = fetch_queue.wait(remaining)
1685
tansell9e04a8d2016-07-28 09:31:59 -07001686 # Create the files in the destination using item in cache as the
1687 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001688 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001689 fullpath = os.path.join(outdir, filepath)
1690
1691 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001692 filetype = props.get('t', 'basic')
1693
1694 if filetype == 'basic':
1695 file_mode = props.get('m')
1696 if file_mode:
1697 # Ignore all bits apart from the user
1698 file_mode &= 0700
1699 putfile(
1700 srcfileobj, fullpath, file_mode,
1701 use_symlink=use_symlinks)
1702
tansell26de79e2016-11-13 18:41:11 -08001703 elif filetype == 'tar':
1704 basedir = os.path.dirname(fullpath)
1705 with tarfile.TarFile(fileobj=srcfileobj) as extractor:
1706 for ti in extractor:
1707 if not ti.isfile():
1708 logging.warning(
1709 'Path(%r) is nonfile (%s), skipped',
1710 ti.name, ti.type)
1711 continue
1712 fp = os.path.normpath(os.path.join(basedir, ti.name))
1713 if not fp.startswith(basedir):
1714 logging.error(
1715 'Path(%r) is outside root directory',
1716 fp)
1717 ifd = extractor.extractfile(ti)
1718 file_path.ensure_tree(os.path.dirname(fp))
1719 putfile(ifd, fp, 0700, ti.size)
1720
tanselle4288c32016-07-28 09:45:40 -07001721 elif filetype == 'ar':
1722 basedir = os.path.dirname(fullpath)
1723 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
1724 for ai, ifd in extractor:
1725 fp = os.path.normpath(os.path.join(basedir, ai.name))
tansell26de79e2016-11-13 18:41:11 -08001726 if not fp.startswith(basedir):
1727 logging.error(
1728 'Path(%r) is outside root directory',
1729 fp)
tanselle4288c32016-07-28 09:45:40 -07001730 file_path.ensure_tree(os.path.dirname(fp))
1731 putfile(ifd, fp, 0700, ai.size)
1732
1733 else:
1734 raise isolated_format.IsolatedError(
1735 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001736
1737 # Report progress.
1738 duration = time.time() - last_update
1739 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1740 msg = '%d files remaining...' % len(remaining)
1741 print msg
1742 logging.info(msg)
1743 last_update = time.time()
1744
1745 # Cache could evict some items we just tried to fetch, it's a fatal error.
1746 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001747 raise isolated_format.MappingError(
1748 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001749 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001750
1751
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001752def directory_to_metadata(root, algo, blacklist):
1753 """Returns the FileItem list and .isolated metadata for a directory."""
1754 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001755 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001756 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001757 metadata = {
1758 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001759 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001760 for relpath in paths
1761 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001762 for v in metadata.itervalues():
1763 v.pop('t')
1764 items = [
1765 FileItem(
1766 path=os.path.join(root, relpath),
1767 digest=meta['h'],
1768 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001769 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001770 for relpath, meta in metadata.iteritems() if 'h' in meta
1771 ]
1772 return items, metadata
1773
1774
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001775def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001776 """Stores every entries and returns the relevant data.
1777
1778 Arguments:
1779 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001780 files: list of file paths to upload. If a directory is specified, a
1781 .isolated file is created and its hash is returned.
1782 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001783
1784 Returns:
1785 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1786 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001787 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001788 assert all(isinstance(i, unicode) for i in files), files
1789 if len(files) != len(set(map(os.path.abspath, files))):
1790 raise Error('Duplicate entries found.')
1791
maruel064c0a32016-04-05 11:47:15 -07001792 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001793 results = []
1794 # The temporary directory is only created as needed.
1795 tempdir = None
1796 try:
1797 # TODO(maruel): Yield the files to a worker thread.
1798 items_to_upload = []
1799 for f in files:
1800 try:
1801 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001802 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001803 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001804 items, metadata = directory_to_metadata(
1805 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001806
1807 # Create the .isolated file.
1808 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001809 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1810 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001811 os.close(handle)
1812 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001813 'algo':
1814 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001815 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001816 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001817 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001818 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001819 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001820 items_to_upload.extend(items)
1821 items_to_upload.append(
1822 FileItem(
1823 path=isolated,
1824 digest=h,
maruel12e30012015-10-09 11:55:35 -07001825 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001826 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001827 results.append((h, f))
1828
maruel12e30012015-10-09 11:55:35 -07001829 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001830 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001831 items_to_upload.append(
1832 FileItem(
1833 path=filepath,
1834 digest=h,
maruel12e30012015-10-09 11:55:35 -07001835 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001836 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001837 results.append((h, f))
1838 else:
1839 raise Error('%s is neither a file or directory.' % f)
1840 except OSError:
1841 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001842 uploaded = storage.upload_items(items_to_upload)
1843 cold = [i for i in items_to_upload if i in uploaded]
1844 hot = [i for i in items_to_upload if i not in uploaded]
1845 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001846 finally:
maruel12e30012015-10-09 11:55:35 -07001847 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001848 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001849
1850
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001851def archive(out, namespace, files, blacklist):
1852 if files == ['-']:
1853 files = sys.stdin.readlines()
1854
1855 if not files:
1856 raise Error('Nothing to upload')
1857
1858 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001859 blacklist = tools.gen_blacklist(blacklist)
1860 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001861 # Ignore stats.
1862 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001863 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1864
1865
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001866@subcommand.usage('<file1..fileN> or - to read from stdin')
1867def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001868 """Archives data to the server.
1869
1870 If a directory is specified, a .isolated file is created the whole directory
1871 is uploaded. Then this .isolated file can be included in another one to run
1872 commands.
1873
1874 The commands output each file that was processed with its content hash. For
1875 directories, the .isolated generated for the directory is listed as the
1876 directory entry itself.
1877 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001878 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001879 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001880 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001881 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001882 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001883 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001884 except Error as e:
1885 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001886 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001887
1888
1889def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001890 """Download data from the server.
1891
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001892 It can either download individual files or a complete tree from a .isolated
1893 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001894 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001895 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001896 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001897 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001898 help='hash of an isolated file, .isolated file content is discarded, use '
1899 '--file if you need it')
1900 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001901 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1902 help='hash and destination of a file, can be used multiple times')
1903 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001904 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001905 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001906 parser.add_option(
1907 '--use-symlinks', action='store_true',
1908 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001909 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001910 options, args = parser.parse_args(args)
1911 if args:
1912 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001913
nodir55be77b2016-05-03 09:39:57 -07001914 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001915 if bool(options.isolated) == bool(options.file):
1916 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001917 if not options.cache and options.use_symlinks:
1918 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001919
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001920 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001921 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001922 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001923 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001924 if (fs.isfile(options.target) or
1925 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001926 parser.error(
1927 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001928 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001929 # Fetching individual files.
1930 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001931 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001932 channel = threading_utils.TaskChannel()
1933 pending = {}
1934 for digest, dest in options.file:
1935 pending[digest] = dest
1936 storage.async_fetch(
1937 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001938 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001939 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001940 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001941 functools.partial(file_write, os.path.join(options.target, dest)))
1942 while pending:
1943 fetched = channel.pull()
1944 dest = pending.pop(fetched)
1945 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001946
Vadim Shtayura3172be52013-12-03 12:49:05 -08001947 # Fetching whole isolated tree.
1948 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001949 with cache:
1950 bundle = fetch_isolated(
1951 isolated_hash=options.isolated,
1952 storage=storage,
1953 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001954 outdir=options.target,
1955 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001956 if bundle.command:
1957 rel = os.path.join(options.target, bundle.relative_cwd)
1958 print('To run this test please run from the directory %s:' %
1959 os.path.join(options.target, rel))
1960 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001961
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001962 return 0
1963
1964
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001965def add_archive_options(parser):
1966 parser.add_option(
1967 '--blacklist',
1968 action='append', default=list(DEFAULT_BLACKLIST),
1969 help='List of regexp to use as blacklist filter when uploading '
1970 'directories')
1971
1972
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001973def add_isolate_server_options(parser):
1974 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001975 parser.add_option(
1976 '-I', '--isolate-server',
1977 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001978 help='URL of the Isolate Server to use. Defaults to the environment '
1979 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1980 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001981 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001982 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001983 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001984 '--namespace', default='default-gzip',
1985 help='The namespace to use on the Isolate Server, default: %default')
1986
1987
nodir55be77b2016-05-03 09:39:57 -07001988def process_isolate_server_options(
1989 parser, options, set_exception_handler, required):
1990 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001991
1992 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001993 """
1994 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001995 if required:
1996 parser.error('--isolate-server is required.')
1997 return
1998
aludwind7b7b7e2017-06-29 16:38:50 -07001999 if options.grpc_proxy:
2000 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08002001 else:
2002 try:
2003 options.isolate_server = net.fix_url(options.isolate_server)
2004 except ValueError as e:
2005 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002006 if set_exception_handler:
2007 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002008 try:
2009 return auth.ensure_logged_in(options.isolate_server)
2010 except ValueError as e:
2011 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002012
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002013
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002014def add_cache_options(parser):
2015 cache_group = optparse.OptionGroup(parser, 'Cache management')
2016 cache_group.add_option(
2017 '--cache', metavar='DIR',
2018 help='Directory to keep a local cache of the files. Accelerates download '
2019 'by reusing already downloaded files. Default=%default')
2020 cache_group.add_option(
2021 '--max-cache-size',
2022 type='int',
2023 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002024 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002025 help='Trim if the cache gets larger than this value, default=%default')
2026 cache_group.add_option(
2027 '--min-free-space',
2028 type='int',
2029 metavar='NNN',
2030 default=2*1024*1024*1024,
2031 help='Trim if disk free space becomes lower than this value, '
2032 'default=%default')
2033 cache_group.add_option(
2034 '--max-items',
2035 type='int',
2036 metavar='NNN',
2037 default=100000,
2038 help='Trim if more than this number of items are in the cache '
2039 'default=%default')
2040 parser.add_option_group(cache_group)
2041
2042
maruele6fc9382017-05-04 09:03:48 -07002043def process_cache_options(options, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002044 if options.cache:
2045 policies = CachePolicies(
2046 options.max_cache_size, options.min_free_space, options.max_items)
2047
2048 # |options.cache| path may not exist until DiskCache() instance is created.
2049 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002050 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002051 policies,
nodirf33b8d62016-10-26 22:34:58 -07002052 isolated_format.get_hash_algo(options.namespace),
maruele6fc9382017-05-04 09:03:48 -07002053 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002054 else:
2055 return MemoryCache()
2056
2057
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002058class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002059 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002060 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002061 self,
2062 version=__version__,
2063 prog=os.path.basename(sys.modules[__name__].__file__),
2064 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002065 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002066
2067 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002068 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002069 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002070 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002071 return options, args
2072
2073
2074def main(args):
2075 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002076 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002077
2078
2079if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002080 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002081 fix_encoding.fix_encoding()
2082 tools.disable_buffering()
2083 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002084 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002085 sys.exit(main(sys.argv[1:]))