blob: b40abe125fff26440dc67e74f8ff703df7579971 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
aludwin81178302016-11-30 17:18:49 -08008__version__ = '0.8.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
13import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040015import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
tansell26de79e2016-11-13 18:41:11 -080021import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050022import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000023import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050025import types
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000026import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000027
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028from third_party import colorama
29from third_party.depot_tools import fix_encoding
30from third_party.depot_tools import subcommand
31
tanselle4288c32016-07-28 09:45:40 -070032from libs import arfile
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050033from utils import file_path
maruel12e30012015-10-09 11:55:35 -070034from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040035from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040036from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000037from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040038from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070039from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000040from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000041from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080043import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040044import isolated_format
aludwin81178302016-11-30 17:18:49 -080045import isolate_storage
46from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080047
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049# Version of isolate protocol passed to the server in /handshake request.
50ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000051
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000052
Vadim Shtayura3148e072014-09-02 18:51:52 -070053# The file size to be used when we don't know the correct file size,
54# generally used for .isolated files.
55UNKNOWN_FILE_SIZE = None
56
57
58# Maximum expected delay (in seconds) between successive file fetches or uploads
59# in Storage. If it takes longer than that, a deadlock might be happening
60# and all stack frames for all threads are dumped to log.
61DEADLOCK_TIMEOUT = 5 * 60
62
63
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# All files are sorted by likelihood of a change in the file content
66# (currently file size is used to estimate this: larger the file -> larger the
67# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000068# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000069# and so on. Numbers here is a trade-off; the more per request, the lower the
70# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
71# larger values cause longer lookups, increasing the initial latency to start
72# uploading, which is especially an issue for large files. This value is
73# optimized for the "few thousands files to look up with minimal number of large
74# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040075ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000076
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000077
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000078# A list of already compressed extension types that should not receive any
79# compression before being uploaded.
80ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040081 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
82 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000083]
84
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000085
maruel@chromium.org41601642013-09-18 19:40:46 +000086# The delay (in seconds) to wait between logging statements when retrieving
87# the required files. This is intended to let the user (or buildbot) know that
88# the program is still running.
89DELAY_BETWEEN_UPDATES_IN_SECS = 30
90
91
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050092DEFAULT_BLACKLIST = (
93 # Temporary vim or python files.
94 r'^.+\.(?:pyc|swp)$',
95 # .git or .svn directory.
96 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
97)
98
99
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500100class Error(Exception):
101 """Generic runtime error."""
102 pass
103
104
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400105class Aborted(Error):
106 """Operation aborted."""
107 pass
108
109
nodir90bc8dc2016-06-15 13:35:21 -0700110class AlreadyExists(Error):
111 """File already exists."""
112
113
maruel12e30012015-10-09 11:55:35 -0700114def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800115 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700116 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800117 if offset:
118 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000119 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000120 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000121 if not data:
122 break
123 yield data
124
125
maruel12e30012015-10-09 11:55:35 -0700126def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000127 """Writes file content as generated by content_generator.
128
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000129 Creates the intermediary directory as needed.
130
131 Returns the number of bytes written.
132
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000133 Meant to be mocked out in unit tests.
134 """
nodire5028a92016-04-29 14:38:21 -0700135 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000136 total = 0
maruel12e30012015-10-09 11:55:35 -0700137 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000139 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000140 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000141 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000142
143
tansell9e04a8d2016-07-28 09:31:59 -0700144def fileobj_path(fileobj):
145 """Return file system path for file like object or None.
146
147 The returned path is guaranteed to exist and can be passed to file system
148 operations like copy.
149 """
150 name = getattr(fileobj, 'name', None)
151 if name is None:
152 return
153
154 # If the file like object was created using something like open("test.txt")
155 # name will end up being a str (such as a function outside our control, like
156 # the standard library). We want all our paths to be unicode objects, so we
157 # decode it.
158 if not isinstance(name, unicode):
159 name = name.decode(sys.getfilesystemencoding())
160
tansell26de79e2016-11-13 18:41:11 -0800161 # fs.exists requires an absolute path, otherwise it will fail with an
162 # assertion error.
163 if not os.path.isabs(name):
164 return
165
tansell9e04a8d2016-07-28 09:31:59 -0700166 if fs.exists(name):
167 return name
168
169
170# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
171# wrappers have been created.
172def fileobj_copy(
173 dstfileobj, srcfileobj, size=-1,
174 chunk_size=isolated_format.DISK_FILE_CHUNK):
175 """Copy data from srcfileobj to dstfileobj.
176
177 Providing size means exactly that amount of data will be copied (if there
178 isn't enough data, an IOError exception is thrown). Otherwise all data until
179 the EOF marker will be copied.
180 """
181 if size == -1 and hasattr(srcfileobj, 'tell'):
182 if srcfileobj.tell() != 0:
183 raise IOError('partial file but not using size')
184
185 written = 0
186 while written != size:
187 readsize = chunk_size
188 if size > 0:
189 readsize = min(readsize, size-written)
190 data = srcfileobj.read(readsize)
191 if not data:
192 if size == -1:
193 break
194 raise IOError('partial file, got %s, wanted %s' % (written, size))
195 dstfileobj.write(data)
196 written += len(data)
197
198
199def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
200 """Put srcfileobj at the given dstpath with given mode.
201
202 The function aims to do this as efficiently as possible while still allowing
203 any possible file like object be given.
204
205 Creating a tree of hardlinks has a few drawbacks:
206 - tmpfs cannot be used for the scratch space. The tree has to be on the same
207 partition as the cache.
208 - involves a write to the inode, which advances ctime, cause a metadata
209 writeback (causing disk seeking).
210 - cache ctime cannot be used to detect modifications / corruption.
211 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
212 partition. This is why the function automatically fallbacks to copying the
213 file content.
214 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
215 same owner is for all hardlinks.
216 - Anecdotal report that ext2 is known to be potentially faulty on high rate
217 of hardlink creation.
218
219 Creating a tree of symlinks has a few drawbacks:
220 - Tasks running the equivalent of os.path.realpath() will get the naked path
221 and may fail.
222 - Windows:
223 - Symlinks are reparse points:
224 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
225 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
226 - Symbolic links are Win32 paths, not NT paths.
227 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
228 - Symbolic links are supported on Windows 7 and later only.
229 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
230 default.
231 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
232 RID is present in the token;
233 https://msdn.microsoft.com/en-us/library/bb530410.aspx
234 """
235 srcpath = fileobj_path(srcfileobj)
236 if srcpath and size == -1:
237 readonly = file_mode is None or (
238 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
239
240 if readonly:
241 # If the file is read only we can link the file
242 if use_symlink:
243 link_mode = file_path.SYMLINK_WITH_FALLBACK
244 else:
245 link_mode = file_path.HARDLINK_WITH_FALLBACK
246 else:
247 # If not read only, we must copy the file
248 link_mode = file_path.COPY
249
250 file_path.link_file(dstpath, srcpath, link_mode)
251 else:
252 # Need to write out the file
253 with fs.open(dstpath, 'wb') as dstfileobj:
254 fileobj_copy(dstfileobj, srcfileobj, size)
255
256 assert fs.exists(dstpath)
257
258 # file_mode of 0 is actually valid, so need explicit check.
259 if file_mode is not None:
260 fs.chmod(dstpath, file_mode)
261
262
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000263def zip_compress(content_generator, level=7):
264 """Reads chunks from |content_generator| and yields zip compressed chunks."""
265 compressor = zlib.compressobj(level)
266 for chunk in content_generator:
267 compressed = compressor.compress(chunk)
268 if compressed:
269 yield compressed
270 tail = compressor.flush(zlib.Z_FINISH)
271 if tail:
272 yield tail
273
274
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400275def zip_decompress(
276 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000277 """Reads zipped data from |content_generator| and yields decompressed data.
278
279 Decompresses data in small chunks (no larger than |chunk_size|) so that
280 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
281
282 Raises IOError if data is corrupted or incomplete.
283 """
284 decompressor = zlib.decompressobj()
285 compressed_size = 0
286 try:
287 for chunk in content_generator:
288 compressed_size += len(chunk)
289 data = decompressor.decompress(chunk, chunk_size)
290 if data:
291 yield data
292 while decompressor.unconsumed_tail:
293 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
294 if data:
295 yield data
296 tail = decompressor.flush()
297 if tail:
298 yield tail
299 except zlib.error as e:
300 raise IOError(
301 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
302 # Ensure all data was read and decompressed.
303 if decompressor.unused_data or decompressor.unconsumed_tail:
304 raise IOError('Not all data was decompressed')
305
306
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000307def get_zip_compression_level(filename):
308 """Given a filename calculates the ideal zip compression level to use."""
309 file_ext = os.path.splitext(filename)[1].lower()
310 # TODO(csharp): Profile to find what compression level works best.
311 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
312
313
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000314def create_directories(base_directory, files):
315 """Creates the directory structure needed by the given list of files."""
316 logging.debug('create_directories(%s, %d)', base_directory, len(files))
317 # Creates the tree of directories to create.
318 directories = set(os.path.dirname(f) for f in files)
319 for item in list(directories):
320 while item:
321 directories.add(item)
322 item = os.path.dirname(item)
323 for d in sorted(directories):
324 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700325 abs_d = os.path.join(base_directory, d)
326 if not fs.isdir(abs_d):
327 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000328
329
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500330def create_symlinks(base_directory, files):
331 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000332 for filepath, properties in files:
333 if 'l' not in properties:
334 continue
335 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500336 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000337 logging.warning('Ignoring symlink %s', filepath)
338 continue
339 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700340 try:
341 os.symlink(properties['l'], outfile) # pylint: disable=E1101
342 except OSError as e:
343 if e.errno == errno.EEXIST:
344 raise AlreadyExists('File %s already exists.' % outfile)
345 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000346
347
maruel12e30012015-10-09 11:55:35 -0700348def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000349 """Determines if the given files appears valid.
350
351 Currently it just checks the file's size.
352 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700353 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700354 return fs.isfile(path)
355 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000356 if size != actual_size:
357 logging.warning(
358 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700359 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000360 return False
361 return True
362
363
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000364class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800365 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000366
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800367 Its digest and size may be provided in advance, if known. Otherwise they will
368 be derived from the file content.
369 """
370
371 def __init__(self, path, digest=None, size=None, high_priority=False):
372 super(FileItem, self).__init__(
373 digest,
maruel12e30012015-10-09 11:55:35 -0700374 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800375 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000376 self.path = path
377 self.compression_level = get_zip_compression_level(path)
378
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800379 def content(self):
380 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000381
382
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000383class BufferItem(Item):
384 """A byte buffer to push to Storage."""
385
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800386 def __init__(self, buf, high_priority=False):
387 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000388 self.buffer = buf
389
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800390 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000391 return [self.buffer]
392
393
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000394class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800395 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000396
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800397 Implements compression support, parallel 'contains' checks, parallel uploads
398 and more.
399
400 Works only within single namespace (and thus hashing algorithm and compression
401 scheme are fixed).
402
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400403 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
404 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800405 """
406
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700407 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000408 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400409 self._use_zip = isolated_format.is_namespace_with_compression(
410 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400411 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 self._cpu_thread_pool = None
413 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400414 self._aborted = False
415 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000416
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000417 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700418 def hash_algo(self):
419 """Hashing algorithm used to name files in storage based on their content.
420
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400421 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700422 """
423 return self._hash_algo
424
425 @property
426 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500427 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700428 return self._storage_api.location
429
430 @property
431 def namespace(self):
432 """Isolate namespace used by this storage.
433
434 Indirectly defines hashing scheme and compression method used.
435 """
436 return self._storage_api.namespace
437
438 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000439 def cpu_thread_pool(self):
440 """ThreadPool for CPU-bound tasks like zipping."""
441 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500442 threads = max(threading_utils.num_processors(), 2)
443 if sys.maxsize <= 2L**32:
444 # On 32 bits userland, do not try to use more than 16 threads.
445 threads = min(threads, 16)
446 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000447 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000448
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000449 @property
450 def net_thread_pool(self):
451 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
452 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700453 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000454 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000455
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000456 def close(self):
457 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400458 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 if self._cpu_thread_pool:
460 self._cpu_thread_pool.join()
461 self._cpu_thread_pool.close()
462 self._cpu_thread_pool = None
463 if self._net_thread_pool:
464 self._net_thread_pool.join()
465 self._net_thread_pool.close()
466 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400467 logging.info('Done.')
468
469 def abort(self):
470 """Cancels any pending or future operations."""
471 # This is not strictly theadsafe, but in the worst case the logging message
472 # will be printed twice. Not a big deal. In other places it is assumed that
473 # unprotected reads and writes to _aborted are serializable (it is true
474 # for python) and thus no locking is used.
475 if not self._aborted:
476 logging.warning('Aborting... It can take a while.')
477 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000478
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000479 def __enter__(self):
480 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400481 assert not self._prev_sig_handlers, self._prev_sig_handlers
482 for s in (signal.SIGINT, signal.SIGTERM):
483 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000484 return self
485
486 def __exit__(self, _exc_type, _exc_value, _traceback):
487 """Context manager interface."""
488 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400489 while self._prev_sig_handlers:
490 s, h = self._prev_sig_handlers.popitem()
491 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000492 return False
493
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000494 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800495 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800497 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000498
499 Arguments:
500 items: list of Item instances that represents data to upload.
501
502 Returns:
503 List of items that were uploaded. All other items are already there.
504 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700505 logging.info('upload_items(items=%d)', len(items))
506
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800507 # Ensure all digests are calculated.
508 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700509 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000511 # For each digest keep only first Item that matches it. All other items
512 # are just indistinguishable copies from the point of view of isolate
513 # server (it doesn't care about paths at all, only content and digests).
514 seen = {}
515 duplicates = 0
516 for item in items:
517 if seen.setdefault(item.digest, item) is not item:
518 duplicates += 1
519 items = seen.values()
520 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700521 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000522
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000523 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000524 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000525 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800526 channel = threading_utils.TaskChannel()
527 for missing_item, push_state in self.get_missing_items(items):
528 missing.add(missing_item)
529 self.async_push(channel, missing_item, push_state)
530
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000531 # No need to spawn deadlock detector thread if there's nothing to upload.
532 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700533 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000535 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000536 detector.ping()
537 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000539 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000540 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000541 logging.info('All files are uploaded')
542
543 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000544 total = len(items)
545 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000546 logging.info(
547 'Total: %6d, %9.1fkb',
548 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000549 total_size / 1024.)
550 cache_hit = set(items) - missing
551 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000552 logging.info(
553 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
554 len(cache_hit),
555 cache_hit_size / 1024.,
556 len(cache_hit) * 100. / total,
557 cache_hit_size * 100. / total_size if total_size else 0)
558 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000559 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000560 logging.info(
561 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
562 len(cache_miss),
563 cache_miss_size / 1024.,
564 len(cache_miss) * 100. / total,
565 cache_miss_size * 100. / total_size if total_size else 0)
566
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000567 return uploaded
568
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800569 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000570 """Starts asynchronous push to the server in a parallel thread.
571
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 Can be used only after |item| was checked for presence on a server with
573 'get_missing_items' call. 'get_missing_items' returns |push_state| object
574 that contains storage specific information describing how to upload
575 the item (for example in case of cloud storage, it is signed upload URLs).
576
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000577 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000578 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000579 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800580 push_state: push state returned by 'get_missing_items' call for |item|.
581
582 Returns:
583 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000584 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800585 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400586 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700587 threading_utils.PRIORITY_HIGH if item.high_priority
588 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800589
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000590 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400591 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400592 if self._aborted:
593 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700594 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800595 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000596 return item
597
598 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700599 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800600 self.net_thread_pool.add_task_with_channel(
601 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000602 return
603
604 # If zipping is enabled, zip in a separate thread.
605 def zip_and_push():
606 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
607 # content right here. It will block until all file is zipped.
608 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400609 if self._aborted:
610 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800611 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000612 data = ''.join(stream)
613 except Exception as exc:
614 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800615 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000616 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000617 self.net_thread_pool.add_task_with_channel(
618 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000619 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000620
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800621 def push(self, item, push_state):
622 """Synchronously pushes a single item to the server.
623
624 If you need to push many items at once, consider using 'upload_items' or
625 'async_push' with instance of TaskChannel.
626
627 Arguments:
628 item: item to upload as instance of Item class.
629 push_state: push state returned by 'get_missing_items' call for |item|.
630
631 Returns:
632 Pushed item (same object as |item|).
633 """
634 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700635 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800636 self.async_push(channel, item, push_state)
637 pushed = channel.pull()
638 assert pushed is item
639 return item
640
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000641 def async_fetch(self, channel, priority, digest, size, sink):
642 """Starts asynchronous fetch from the server in a parallel thread.
643
644 Arguments:
645 channel: TaskChannel that receives back |digest| when download ends.
646 priority: thread pool task priority for the fetch.
647 digest: hex digest of an item to download.
648 size: expected size of the item (after decompression).
649 sink: function that will be called as sink(generator).
650 """
651 def fetch():
652 try:
653 # Prepare reading pipeline.
654 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700655 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400656 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000657 # Run |stream| through verifier that will assert its size.
658 verifier = FetchStreamVerifier(stream, size)
659 # Verified stream goes to |sink|.
660 sink(verifier.run())
661 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800662 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000663 raise
664 return digest
665
666 # Don't bother with zip_thread_pool for decompression. Decompression is
667 # really fast and most probably IO bound anyway.
668 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
669
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000670 def get_missing_items(self, items):
671 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000672
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000673 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000674
675 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000676 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000677
678 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800679 For each missing item it yields a pair (item, push_state), where:
680 * item - Item object that is missing (one of |items|).
681 * push_state - opaque object that contains storage specific information
682 describing how to upload the item (for example in case of cloud
683 storage, it is signed upload URLs). It can later be passed to
684 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000685 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000686 channel = threading_utils.TaskChannel()
687 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800688
689 # Ensure all digests are calculated.
690 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700691 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800692
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400693 def contains(batch):
694 if self._aborted:
695 raise Aborted()
696 return self._storage_api.contains(batch)
697
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000698 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800699 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400700 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400701 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000702 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800703
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000704 # Yield results as they come in.
705 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800706 for missing_item, push_state in channel.pull().iteritems():
707 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000708
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000709
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800710def batch_items_for_check(items):
711 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000712
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800713 Each batch corresponds to a single 'exists?' query to the server via a call
714 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000715
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800716 Arguments:
717 items: a list of Item objects.
718
719 Yields:
720 Batches of items to query for existence in a single operation,
721 each batch is a list of Item objects.
722 """
723 batch_count = 0
724 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
725 next_queries = []
726 for item in sorted(items, key=lambda x: x.size, reverse=True):
727 next_queries.append(item)
728 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000729 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800730 next_queries = []
731 batch_count += 1
732 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
733 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
734 if next_queries:
735 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000736
737
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000738class FetchQueue(object):
739 """Fetches items from Storage and places them into LocalCache.
740
741 It manages multiple concurrent fetch operations. Acts as a bridge between
742 Storage and LocalCache so that Storage and LocalCache don't depend on each
743 other at all.
744 """
745
746 def __init__(self, storage, cache):
747 self.storage = storage
748 self.cache = cache
749 self._channel = threading_utils.TaskChannel()
750 self._pending = set()
751 self._accessed = set()
752 self._fetched = cache.cached_set()
753
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400754 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700755 self,
756 digest,
757 size=UNKNOWN_FILE_SIZE,
758 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000759 """Starts asynchronous fetch of item |digest|."""
760 # Fetching it now?
761 if digest in self._pending:
762 return
763
764 # Mark this file as in use, verify_all_cached will later ensure it is still
765 # in cache.
766 self._accessed.add(digest)
767
768 # Already fetched? Notify cache to update item's LRU position.
769 if digest in self._fetched:
770 # 'touch' returns True if item is in cache and not corrupted.
771 if self.cache.touch(digest, size):
772 return
773 # Item is corrupted, remove it from cache and fetch it again.
774 self._fetched.remove(digest)
775 self.cache.evict(digest)
776
777 # TODO(maruel): It should look at the free disk space, the current cache
778 # size and the size of the new item on every new item:
779 # - Trim the cache as more entries are listed when free disk space is low,
780 # otherwise if the amount of data downloaded during the run > free disk
781 # space, it'll crash.
782 # - Make sure there's enough free disk space to fit all dependencies of
783 # this run! If not, abort early.
784
785 # Start fetching.
786 self._pending.add(digest)
787 self.storage.async_fetch(
788 self._channel, priority, digest, size,
789 functools.partial(self.cache.write, digest))
790
791 def wait(self, digests):
792 """Starts a loop that waits for at least one of |digests| to be retrieved.
793
794 Returns the first digest retrieved.
795 """
796 # Flush any already fetched items.
797 for digest in digests:
798 if digest in self._fetched:
799 return digest
800
801 # Ensure all requested items are being fetched now.
802 assert all(digest in self._pending for digest in digests), (
803 digests, self._pending)
804
805 # Wait for some requested item to finish fetching.
806 while self._pending:
807 digest = self._channel.pull()
808 self._pending.remove(digest)
809 self._fetched.add(digest)
810 if digest in digests:
811 return digest
812
813 # Should never reach this point due to assert above.
814 raise RuntimeError('Impossible state')
815
816 def inject_local_file(self, path, algo):
817 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700818 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000819 data = f.read()
820 digest = algo(data).hexdigest()
821 self.cache.write(digest, [data])
822 self._fetched.add(digest)
823 return digest
824
825 @property
826 def pending_count(self):
827 """Returns number of items to be fetched."""
828 return len(self._pending)
829
830 def verify_all_cached(self):
831 """True if all accessed items are in cache."""
832 return self._accessed.issubset(self.cache.cached_set())
833
834
835class FetchStreamVerifier(object):
836 """Verifies that fetched file is valid before passing it to the LocalCache."""
837
838 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400839 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000840 self.stream = stream
841 self.expected_size = expected_size
842 self.current_size = 0
843
844 def run(self):
845 """Generator that yields same items as |stream|.
846
847 Verifies |stream| is complete before yielding a last chunk to consumer.
848
849 Also wraps IOError produced by consumer into MappingError exceptions since
850 otherwise Storage will retry fetch on unrelated local cache errors.
851 """
852 # Read one chunk ahead, keep it in |stored|.
853 # That way a complete stream can be verified before pushing last chunk
854 # to consumer.
855 stored = None
856 for chunk in self.stream:
857 assert chunk is not None
858 if stored is not None:
859 self._inspect_chunk(stored, is_last=False)
860 try:
861 yield stored
862 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400863 raise isolated_format.MappingError(
864 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000865 stored = chunk
866 if stored is not None:
867 self._inspect_chunk(stored, is_last=True)
868 try:
869 yield stored
870 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400871 raise isolated_format.MappingError(
872 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000873
874 def _inspect_chunk(self, chunk, is_last):
875 """Called for each fetched chunk before passing it to consumer."""
876 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400877 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700878 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000879 (self.expected_size != self.current_size)):
880 raise IOError('Incorrect file size: expected %d, got %d' % (
881 self.expected_size, self.current_size))
882
883
nodir445097b2016-06-03 22:50:26 -0700884class CacheMiss(Exception):
885 """Raised when an item is not in cache."""
886
887 def __init__(self, digest):
888 self.digest = digest
889 super(CacheMiss, self).__init__(
890 'Item with digest %r is not found in cache' % digest)
891
892
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000893class LocalCache(object):
894 """Local cache that stores objects fetched via Storage.
895
896 It can be accessed concurrently from multiple threads, so it should protect
897 its internal state with some lock.
898 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500899 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000900
maruel064c0a32016-04-05 11:47:15 -0700901 def __init__(self):
902 self._lock = threading_utils.LockWithAssert()
903 # Profiling values.
904 self._added = []
905 self._initial_number_items = 0
906 self._initial_size = 0
907 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700908 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700909
nodirbe642ff2016-06-09 15:51:51 -0700910 def __contains__(self, digest):
911 raise NotImplementedError()
912
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000913 def __enter__(self):
914 """Context manager interface."""
915 return self
916
917 def __exit__(self, _exc_type, _exec_value, _traceback):
918 """Context manager interface."""
919 return False
920
maruel064c0a32016-04-05 11:47:15 -0700921 @property
922 def added(self):
923 return self._added[:]
924
925 @property
926 def evicted(self):
927 return self._evicted[:]
928
929 @property
tansell9e04a8d2016-07-28 09:31:59 -0700930 def used(self):
931 return self._used[:]
932
933 @property
maruel064c0a32016-04-05 11:47:15 -0700934 def initial_number_items(self):
935 return self._initial_number_items
936
937 @property
938 def initial_size(self):
939 return self._initial_size
940
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000941 def cached_set(self):
942 """Returns a set of all cached digests (always a new object)."""
943 raise NotImplementedError()
944
maruel36a963d2016-04-08 17:15:49 -0700945 def cleanup(self):
946 """Deletes any corrupted item from the cache and trims it if necessary."""
947 raise NotImplementedError()
948
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000949 def touch(self, digest, size):
950 """Ensures item is not corrupted and updates its LRU position.
951
952 Arguments:
953 digest: hash digest of item to check.
954 size: expected size of this item.
955
956 Returns:
957 True if item is in cache and not corrupted.
958 """
959 raise NotImplementedError()
960
961 def evict(self, digest):
962 """Removes item from cache if it's there."""
963 raise NotImplementedError()
964
tansell9e04a8d2016-07-28 09:31:59 -0700965 def getfileobj(self, digest):
966 """Returns a readable file like object.
967
968 If file exists on the file system it will have a .name attribute with an
969 absolute path to the file.
970 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000971 raise NotImplementedError()
972
973 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -0700974 """Reads data from |content| generator and stores it in cache.
975
976 Returns digest to simplify chaining.
977 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000978 raise NotImplementedError()
979
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000980
981class MemoryCache(LocalCache):
982 """LocalCache implementation that stores everything in memory."""
983
Vadim Shtayurae3fbd102014-04-29 17:05:21 -0700984 def __init__(self, file_mode_mask=0500):
985 """Args:
986 file_mode_mask: bit mask to AND file mode with. Default value will make
987 all mapped files to be read only.
988 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000989 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -0700990 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000991 self._contents = {}
992
nodirbe642ff2016-06-09 15:51:51 -0700993 def __contains__(self, digest):
994 with self._lock:
995 return digest in self._contents
996
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000997 def cached_set(self):
998 with self._lock:
999 return set(self._contents)
1000
maruel36a963d2016-04-08 17:15:49 -07001001 def cleanup(self):
1002 pass
1003
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001004 def touch(self, digest, size):
1005 with self._lock:
1006 return digest in self._contents
1007
1008 def evict(self, digest):
1009 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001010 v = self._contents.pop(digest, None)
1011 if v is not None:
1012 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001013
tansell9e04a8d2016-07-28 09:31:59 -07001014 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001016 try:
tansell9e04a8d2016-07-28 09:31:59 -07001017 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001018 except KeyError:
1019 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001020 self._used.append(len(d))
1021 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001022
1023 def write(self, digest, content):
1024 # Assemble whole stream before taking the lock.
1025 data = ''.join(content)
1026 with self._lock:
1027 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001028 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001029 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001030
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001031
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001032class CachePolicies(object):
1033 def __init__(self, max_cache_size, min_free_space, max_items):
1034 """
1035 Arguments:
1036 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1037 cache is effectively a leak.
1038 - min_free_space: Trim if disk free space becomes lower than this value. If
1039 0, it unconditionally fill the disk.
1040 - max_items: Maximum number of items to keep in the cache. If 0, do not
1041 enforce a limit.
1042 """
1043 self.max_cache_size = max_cache_size
1044 self.min_free_space = min_free_space
1045 self.max_items = max_items
1046
1047
1048class DiskCache(LocalCache):
1049 """Stateful LRU cache in a flat hash table in a directory.
1050
1051 Saves its state as json file.
1052 """
maruel12e30012015-10-09 11:55:35 -07001053 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001054
nodirf33b8d62016-10-26 22:34:58 -07001055 def __init__(self, cache_dir, policies, hash_algo, trim=True):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001056 """
1057 Arguments:
1058 cache_dir: directory where to place the cache.
1059 policies: cache retention policies.
1060 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001061 trim: if True to enforce |policies| right away.
1062 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001063 """
maruel064c0a32016-04-05 11:47:15 -07001064 # All protected methods (starting with '_') except _path should be called
1065 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001066 super(DiskCache, self).__init__()
1067 self.cache_dir = cache_dir
1068 self.policies = policies
1069 self.hash_algo = hash_algo
1070 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001071 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001072 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001073 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001074 file_path.ensure_tree(self.cache_dir)
1075 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001076 # The first item in the LRU cache that must not be evicted during this run
1077 # since it was referenced. All items more recent that _protected in the LRU
1078 # cache are also inherently protected. It could be a set() of all items
1079 # referenced but this increases memory usage without a use case.
1080 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001081 # Cleanup operations done by self._load(), if any.
1082 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001083 with tools.Profiler('Setup'):
1084 with self._lock:
nodirf33b8d62016-10-26 22:34:58 -07001085 self._load(trim=trim)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001086
nodirbe642ff2016-06-09 15:51:51 -07001087 def __contains__(self, digest):
1088 with self._lock:
1089 return digest in self._lru
1090
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001091 def __enter__(self):
1092 return self
1093
1094 def __exit__(self, _exc_type, _exec_value, _traceback):
1095 with tools.Profiler('CleanupTrimming'):
1096 with self._lock:
1097 self._trim()
1098
1099 logging.info(
1100 '%5d (%8dkb) added',
1101 len(self._added), sum(self._added) / 1024)
1102 logging.info(
1103 '%5d (%8dkb) current',
1104 len(self._lru),
1105 sum(self._lru.itervalues()) / 1024)
1106 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001107 '%5d (%8dkb) evicted',
1108 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001109 logging.info(
1110 ' %8dkb free',
1111 self._free_disk / 1024)
1112 return False
1113
1114 def cached_set(self):
1115 with self._lock:
1116 return self._lru.keys_set()
1117
maruel36a963d2016-04-08 17:15:49 -07001118 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001119 """Cleans up the cache directory.
1120
1121 Ensures there is no unknown files in cache_dir.
1122 Ensures the read-only bits are set correctly.
1123
1124 At that point, the cache was already loaded, trimmed to respect cache
1125 policies.
1126 """
1127 fs.chmod(self.cache_dir, 0700)
1128 # Ensure that all files listed in the state still exist and add new ones.
1129 previous = self._lru.keys_set()
1130 # It'd be faster if there were a readdir() function.
1131 for filename in fs.listdir(self.cache_dir):
1132 if filename == self.STATE_FILE:
1133 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1134 continue
1135 if filename in previous:
1136 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1137 previous.remove(filename)
1138 continue
1139
1140 # An untracked file. Delete it.
1141 logging.warning('Removing unknown file %s from cache', filename)
1142 p = self._path(filename)
1143 if fs.isdir(p):
1144 try:
1145 file_path.rmtree(p)
1146 except OSError:
1147 pass
1148 else:
1149 file_path.try_remove(p)
1150 continue
1151
1152 if previous:
1153 # Filter out entries that were not found.
1154 logging.warning('Removed %d lost files', len(previous))
1155 for filename in previous:
1156 self._lru.pop(filename)
maruel36a963d2016-04-08 17:15:49 -07001157
1158 # What remains to be done is to hash every single item to
1159 # detect corruption, then save to ensure state.json is up to date.
1160 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1161 # TODO(maruel): Let's revisit once directory metadata is stored in
1162 # state.json so only the files that had been mapped since the last cleanup()
1163 # call are manually verified.
1164 #
1165 #with self._lock:
1166 # for digest in self._lru:
1167 # if not isolated_format.is_valid_hash(
1168 # self._path(digest), self.hash_algo):
1169 # self.evict(digest)
1170 # logging.info('Deleted corrupted item: %s', digest)
1171
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001172 def touch(self, digest, size):
1173 """Verifies an actual file is valid.
1174
1175 Note that is doesn't compute the hash so it could still be corrupted if the
1176 file size didn't change.
1177
1178 TODO(maruel): More stringent verification while keeping the check fast.
1179 """
1180 # Do the check outside the lock.
1181 if not is_valid_file(self._path(digest), size):
1182 return False
1183
1184 # Update it's LRU position.
1185 with self._lock:
1186 if digest not in self._lru:
1187 return False
1188 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001189 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001190 return True
1191
1192 def evict(self, digest):
1193 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001194 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001195 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001196 self._lru.pop(digest)
1197 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1198
tansell9e04a8d2016-07-28 09:31:59 -07001199 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001200 try:
tansell9e04a8d2016-07-28 09:31:59 -07001201 f = fs.open(self._path(digest), 'rb')
1202 with self._lock:
1203 self._used.append(self._lru[digest])
1204 return f
nodir445097b2016-06-03 22:50:26 -07001205 except IOError:
1206 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001207
1208 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001209 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001210 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001211 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001212 path = self._path(digest)
1213 # A stale broken file may remain. It is possible for the file to have write
1214 # access bit removed which would cause the file_write() call to fail to open
1215 # in write mode. Take no chance here.
1216 file_path.try_remove(path)
1217 try:
1218 size = file_write(path, content)
1219 except:
1220 # There are two possible places were an exception can occur:
1221 # 1) Inside |content| generator in case of network or unzipping errors.
1222 # 2) Inside file_write itself in case of disk IO errors.
1223 # In any case delete an incomplete file and propagate the exception to
1224 # caller, it will be logged there.
1225 file_path.try_remove(path)
1226 raise
1227 # Make the file read-only in the cache. This has a few side-effects since
1228 # the file node is modified, so every directory entries to this file becomes
1229 # read-only. It's fine here because it is a new file.
1230 file_path.set_read_only(path, True)
1231 with self._lock:
1232 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001233 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001234
nodirf33b8d62016-10-26 22:34:58 -07001235 def get_oldest(self):
1236 """Returns digest of the LRU item or None."""
1237 try:
1238 return self._lru.get_oldest()[0]
1239 except KeyError:
1240 return None
1241
1242 def get_timestamp(self, digest):
1243 """Returns timestamp of last use of an item.
1244
1245 Raises KeyError if item is not found.
1246 """
1247 return self._lru.get_timestamp(digest)
1248
1249 def trim(self):
1250 """Forces retention policies."""
1251 with self._lock:
1252 self._trim()
1253
1254 def _load(self, trim):
maruel2e8d0f52016-07-16 07:51:29 -07001255 """Loads state of the cache from json file.
1256
1257 If cache_dir does not exist on disk, it is created.
1258 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001259 self._lock.assert_locked()
1260
maruel2e8d0f52016-07-16 07:51:29 -07001261 if not fs.isfile(self.state_file):
1262 if not os.path.isdir(self.cache_dir):
1263 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001264 else:
maruel2e8d0f52016-07-16 07:51:29 -07001265 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001266 try:
1267 self._lru = lru.LRUDict.load(self.state_file)
1268 except ValueError as err:
1269 logging.error('Failed to load cache state: %s' % (err,))
1270 # Don't want to keep broken state file.
1271 file_path.try_remove(self.state_file)
nodirf33b8d62016-10-26 22:34:58 -07001272 if trim:
1273 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001274 # We want the initial cache size after trimming, i.e. what is readily
1275 # avaiable.
1276 self._initial_number_items = len(self._lru)
1277 self._initial_size = sum(self._lru.itervalues())
1278 if self._evicted:
1279 logging.info(
1280 'Trimming evicted items with the following sizes: %s',
1281 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001282
1283 def _save(self):
1284 """Saves the LRU ordering."""
1285 self._lock.assert_locked()
1286 if sys.platform != 'win32':
1287 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001288 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001289 # Necessary otherwise the file can't be created.
1290 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001291 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001292 file_path.set_read_only(self.state_file, False)
1293 self._lru.save(self.state_file)
1294
1295 def _trim(self):
1296 """Trims anything we don't know, make sure enough free space exists."""
1297 self._lock.assert_locked()
1298
1299 # Ensure maximum cache size.
1300 if self.policies.max_cache_size:
1301 total_size = sum(self._lru.itervalues())
1302 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001303 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001304
1305 # Ensure maximum number of items in the cache.
1306 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1307 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001308 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001309
1310 # Ensure enough free space.
1311 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001312 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001313 while (
1314 self.policies.min_free_space and
1315 self._lru and
1316 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001317 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001318 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001319
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001320 if trimmed_due_to_space:
1321 total_usage = sum(self._lru.itervalues())
1322 usage_percent = 0.
1323 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001324 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1325
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001326 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001327 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1328 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1329 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001330 self._free_disk / 1024.,
1331 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001332 usage_percent,
1333 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001334 self._save()
1335
1336 def _path(self, digest):
1337 """Returns the path to one item."""
1338 return os.path.join(self.cache_dir, digest)
1339
maruel2e8d0f52016-07-16 07:51:29 -07001340 def _remove_lru_file(self, allow_protected):
1341 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001342 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001343 try:
nodireabc11c2016-10-18 16:37:28 -07001344 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001345 if not allow_protected and digest == self._protected:
1346 raise Error('Not enough space to map the whole isolated tree')
maruel083fa552016-04-08 14:38:01 -07001347 except KeyError:
1348 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001349 digest, (size, _) = self._lru.pop_oldest()
kjlubickea9abf02016-06-01 09:34:33 -07001350 logging.debug("Removing LRU file %s", digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001351 self._delete_file(digest, size)
1352 return size
1353
1354 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1355 """Adds an item into LRU cache marking it as a newest one."""
1356 self._lock.assert_locked()
1357 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001358 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001359 self._added.append(size)
1360 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001361 self._free_disk -= size
1362 # Do a quicker version of self._trim(). It only enforces free disk space,
1363 # not cache size limits. It doesn't actually look at real free disk space,
1364 # only uses its cache values. self._trim() will be called later to enforce
1365 # real trimming but doing this quick version here makes it possible to map
1366 # an isolated that is larger than the current amount of free disk space when
1367 # the cache size is already large.
1368 while (
1369 self.policies.min_free_space and
1370 self._lru and
1371 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001372 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001373
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001374 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1375 """Deletes cache file from the file system."""
1376 self._lock.assert_locked()
1377 try:
1378 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001379 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001380 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001381 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001382 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001383 except OSError as e:
1384 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1385
1386
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001387class IsolatedBundle(object):
1388 """Fetched and parsed .isolated file with all dependencies."""
1389
Vadim Shtayura3148e072014-09-02 18:51:52 -07001390 def __init__(self):
1391 self.command = []
1392 self.files = {}
1393 self.read_only = None
1394 self.relative_cwd = None
1395 # The main .isolated file, a IsolatedFile instance.
1396 self.root = None
1397
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001398 def fetch(self, fetch_queue, root_isolated_hash, algo):
1399 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001400
1401 It enables support for "included" .isolated files. They are processed in
1402 strict order but fetched asynchronously from the cache. This is important so
1403 that a file in an included .isolated file that is overridden by an embedding
1404 .isolated file is not fetched needlessly. The includes are fetched in one
1405 pass and the files are fetched as soon as all the ones on the left-side
1406 of the tree were fetched.
1407
1408 The prioritization is very important here for nested .isolated files.
1409 'includes' have the highest priority and the algorithm is optimized for both
1410 deep and wide trees. A deep one is a long link of .isolated files referenced
1411 one at a time by one item in 'includes'. A wide one has a large number of
1412 'includes' in a single .isolated file. 'left' is defined as an included
1413 .isolated file earlier in the 'includes' list. So the order of the elements
1414 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001415
1416 As a side effect this method starts asynchronous fetch of all data files
1417 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1418 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001419 """
1420 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1421
1422 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1423 pending = {}
1424 # Set of hashes of already retrieved items to refuse recursive includes.
1425 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001426 # Set of IsolatedFile's whose data files have already being fetched.
1427 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001428
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001429 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001430 h = isolated_file.obj_hash
1431 if h in seen:
1432 raise isolated_format.IsolatedError(
1433 'IsolatedFile %s is retrieved recursively' % h)
1434 assert h not in pending
1435 seen.add(h)
1436 pending[h] = isolated_file
1437 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1438
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001439 # Start fetching root *.isolated file (single file, not the whole bundle).
1440 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001441
1442 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001443 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001444 item_hash = fetch_queue.wait(pending)
1445 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001446 with fetch_queue.cache.getfileobj(item_hash) as f:
1447 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001448
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001449 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001450 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001451 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001452
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001453 # Always fetch *.isolated files in traversal order, waiting if necessary
1454 # until next to-be-processed node loads. "Waiting" is done by yielding
1455 # back to the outer loop, that waits until some *.isolated is loaded.
1456 for node in isolated_format.walk_includes(self.root):
1457 if node not in processed:
1458 # Not visited, and not yet loaded -> wait for it to load.
1459 if not node.is_loaded:
1460 break
1461 # Not visited and loaded -> process it and continue the traversal.
1462 self._start_fetching_files(node, fetch_queue)
1463 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001464
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001465 # All *.isolated files should be processed by now and only them.
1466 all_isolateds = set(isolated_format.walk_includes(self.root))
1467 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001468
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001469 # Extract 'command' and other bundle properties.
1470 for node in isolated_format.walk_includes(self.root):
1471 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001472 self.relative_cwd = self.relative_cwd or ''
1473
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001474 def _start_fetching_files(self, isolated, fetch_queue):
1475 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001476
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001477 Modifies self.files.
1478 """
maruel10bea7b2016-12-07 05:03:49 -08001479 files = isolated.data.get('files', {})
1480 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1481 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001482 # Root isolated has priority on the files being mapped. In particular,
1483 # overridden files must not be fetched.
1484 if filepath not in self.files:
1485 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001486
1487 # Make sure if the isolated is read only, the mode doesn't have write
1488 # bits.
1489 if 'm' in properties and self.read_only:
1490 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1491
1492 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001493 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001494 fetch_queue.add(
1495 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1496
1497 def _update_self(self, node):
1498 """Extracts bundle global parameters from loaded *.isolated file.
1499
1500 Will be called with each loaded *.isolated file in order of traversal of
1501 isolated include graph (see isolated_format.walk_includes).
1502 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001503 # Grabs properties.
1504 if not self.command and node.data.get('command'):
1505 # Ensure paths are correctly separated on windows.
1506 self.command = node.data['command']
1507 if self.command:
1508 self.command[0] = self.command[0].replace('/', os.path.sep)
1509 self.command = tools.fix_python_path(self.command)
1510 if self.read_only is None and node.data.get('read_only') is not None:
1511 self.read_only = node.data['read_only']
1512 if (self.relative_cwd is None and
1513 node.data.get('relative_cwd') is not None):
1514 self.relative_cwd = node.data['relative_cwd']
1515
1516
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001517def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001518 """Returns Storage class that can upload and download from |namespace|.
1519
1520 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001521 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001522 namespace: isolate namespace to operate in, also defines hashing and
1523 compression scheme used, i.e. namespace names that end with '-gzip'
1524 store compressed data.
1525
1526 Returns:
1527 Instance of Storage.
1528 """
aludwin81178302016-11-30 17:18:49 -08001529 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001530
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001531
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001532def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001533 """Uploads the given tree to the given url.
1534
1535 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001536 base_url: The url of the isolate server to upload to.
1537 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001538 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001539 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001540 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001541 # Filter out symlinks, since they are not represented by items on isolate
1542 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001543 items = []
1544 seen = set()
1545 skipped = 0
1546 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001547 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001548 if 'l' not in metadata and filepath not in seen:
1549 seen.add(filepath)
1550 item = FileItem(
1551 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001552 digest=metadata['h'],
1553 size=metadata['s'],
1554 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001555 items.append(item)
1556 else:
1557 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001558
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001559 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001560 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001561 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001562
1563
maruel4409e302016-07-19 14:25:51 -07001564def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001565 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001566
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001567 Arguments:
1568 isolated_hash: hash of the root *.isolated file.
1569 storage: Storage class that communicates with isolate storage.
1570 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001571 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001572 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001573
1574 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001575 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001576 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001577 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001578 'fetch_isolated(%s, %s, %s, %s, %s)',
1579 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001580 # Hash algorithm to use, defined by namespace |storage| is using.
1581 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001582 with cache:
1583 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001584 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001585
1586 with tools.Profiler('GetIsolateds'):
1587 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001588 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001589 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001590 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001591 try:
maruel1ceb3872015-10-14 06:10:44 -07001592 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001593 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001594 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001595 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1596 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001597
1598 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001599 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001600
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001601 with tools.Profiler('GetRest'):
1602 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001603 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001604 create_directories(outdir, bundle.files)
1605 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001606
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001607 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001608 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001609 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001610
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001611 # Multimap: digest -> list of pairs (path, props).
1612 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001613 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001614 if 'h' in props:
1615 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001616
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001617 # Now block on the remaining files to be downloaded and mapped.
1618 logging.info('Retrieving remaining files (%d of them)...',
1619 fetch_queue.pending_count)
1620 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001621 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001622 while remaining:
1623 detector.ping()
1624
1625 # Wait for any item to finish fetching to cache.
1626 digest = fetch_queue.wait(remaining)
1627
tansell9e04a8d2016-07-28 09:31:59 -07001628 # Create the files in the destination using item in cache as the
1629 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001630 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001631 fullpath = os.path.join(outdir, filepath)
1632
1633 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001634 filetype = props.get('t', 'basic')
1635
1636 if filetype == 'basic':
1637 file_mode = props.get('m')
1638 if file_mode:
1639 # Ignore all bits apart from the user
1640 file_mode &= 0700
1641 putfile(
1642 srcfileobj, fullpath, file_mode,
1643 use_symlink=use_symlinks)
1644
tansell26de79e2016-11-13 18:41:11 -08001645 elif filetype == 'tar':
1646 basedir = os.path.dirname(fullpath)
1647 with tarfile.TarFile(fileobj=srcfileobj) as extractor:
1648 for ti in extractor:
1649 if not ti.isfile():
1650 logging.warning(
1651 'Path(%r) is nonfile (%s), skipped',
1652 ti.name, ti.type)
1653 continue
1654 fp = os.path.normpath(os.path.join(basedir, ti.name))
1655 if not fp.startswith(basedir):
1656 logging.error(
1657 'Path(%r) is outside root directory',
1658 fp)
1659 ifd = extractor.extractfile(ti)
1660 file_path.ensure_tree(os.path.dirname(fp))
1661 putfile(ifd, fp, 0700, ti.size)
1662
tanselle4288c32016-07-28 09:45:40 -07001663 elif filetype == 'ar':
1664 basedir = os.path.dirname(fullpath)
1665 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
1666 for ai, ifd in extractor:
1667 fp = os.path.normpath(os.path.join(basedir, ai.name))
tansell26de79e2016-11-13 18:41:11 -08001668 if not fp.startswith(basedir):
1669 logging.error(
1670 'Path(%r) is outside root directory',
1671 fp)
tanselle4288c32016-07-28 09:45:40 -07001672 file_path.ensure_tree(os.path.dirname(fp))
1673 putfile(ifd, fp, 0700, ai.size)
1674
1675 else:
1676 raise isolated_format.IsolatedError(
1677 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001678
1679 # Report progress.
1680 duration = time.time() - last_update
1681 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1682 msg = '%d files remaining...' % len(remaining)
1683 print msg
1684 logging.info(msg)
1685 last_update = time.time()
1686
1687 # Cache could evict some items we just tried to fetch, it's a fatal error.
1688 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001689 raise isolated_format.MappingError(
1690 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001691 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001692
1693
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001694def directory_to_metadata(root, algo, blacklist):
1695 """Returns the FileItem list and .isolated metadata for a directory."""
1696 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001697 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001698 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001699 metadata = {
1700 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001701 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001702 for relpath in paths
1703 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001704 for v in metadata.itervalues():
1705 v.pop('t')
1706 items = [
1707 FileItem(
1708 path=os.path.join(root, relpath),
1709 digest=meta['h'],
1710 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001711 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001712 for relpath, meta in metadata.iteritems() if 'h' in meta
1713 ]
1714 return items, metadata
1715
1716
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001717def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001718 """Stores every entries and returns the relevant data.
1719
1720 Arguments:
1721 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001722 files: list of file paths to upload. If a directory is specified, a
1723 .isolated file is created and its hash is returned.
1724 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001725
1726 Returns:
1727 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1728 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001729 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001730 assert all(isinstance(i, unicode) for i in files), files
1731 if len(files) != len(set(map(os.path.abspath, files))):
1732 raise Error('Duplicate entries found.')
1733
maruel064c0a32016-04-05 11:47:15 -07001734 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001735 results = []
1736 # The temporary directory is only created as needed.
1737 tempdir = None
1738 try:
1739 # TODO(maruel): Yield the files to a worker thread.
1740 items_to_upload = []
1741 for f in files:
1742 try:
1743 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001744 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001745 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001746 items, metadata = directory_to_metadata(
1747 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001748
1749 # Create the .isolated file.
1750 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001751 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1752 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001753 os.close(handle)
1754 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001755 'algo':
1756 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001757 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001758 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001759 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001760 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001761 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001762 items_to_upload.extend(items)
1763 items_to_upload.append(
1764 FileItem(
1765 path=isolated,
1766 digest=h,
maruel12e30012015-10-09 11:55:35 -07001767 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001768 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001769 results.append((h, f))
1770
maruel12e30012015-10-09 11:55:35 -07001771 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001772 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001773 items_to_upload.append(
1774 FileItem(
1775 path=filepath,
1776 digest=h,
maruel12e30012015-10-09 11:55:35 -07001777 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001778 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001779 results.append((h, f))
1780 else:
1781 raise Error('%s is neither a file or directory.' % f)
1782 except OSError:
1783 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001784 uploaded = storage.upload_items(items_to_upload)
1785 cold = [i for i in items_to_upload if i in uploaded]
1786 hot = [i for i in items_to_upload if i not in uploaded]
1787 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001788 finally:
maruel12e30012015-10-09 11:55:35 -07001789 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001790 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001791
1792
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001793def archive(out, namespace, files, blacklist):
1794 if files == ['-']:
1795 files = sys.stdin.readlines()
1796
1797 if not files:
1798 raise Error('Nothing to upload')
1799
1800 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001801 blacklist = tools.gen_blacklist(blacklist)
1802 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001803 # Ignore stats.
1804 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001805 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1806
1807
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001808@subcommand.usage('<file1..fileN> or - to read from stdin')
1809def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001810 """Archives data to the server.
1811
1812 If a directory is specified, a .isolated file is created the whole directory
1813 is uploaded. Then this .isolated file can be included in another one to run
1814 commands.
1815
1816 The commands output each file that was processed with its content hash. For
1817 directories, the .isolated generated for the directory is listed as the
1818 directory entry itself.
1819 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001820 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001821 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001822 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001823 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001824 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001825 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001826 except Error as e:
1827 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001828 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001829
1830
1831def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001832 """Download data from the server.
1833
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001834 It can either download individual files or a complete tree from a .isolated
1835 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001836 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001837 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001838 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001839 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001840 help='hash of an isolated file, .isolated file content is discarded, use '
1841 '--file if you need it')
1842 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001843 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1844 help='hash and destination of a file, can be used multiple times')
1845 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001846 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001847 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001848 parser.add_option(
1849 '--use-symlinks', action='store_true',
1850 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001851 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001852 options, args = parser.parse_args(args)
1853 if args:
1854 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001855
nodir55be77b2016-05-03 09:39:57 -07001856 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001857 if bool(options.isolated) == bool(options.file):
1858 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001859 if not options.cache and options.use_symlinks:
1860 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001861
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001862 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07001863 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001864 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001865 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001866 if (fs.isfile(options.target) or
1867 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001868 parser.error(
1869 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001870 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001871 # Fetching individual files.
1872 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001873 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001874 channel = threading_utils.TaskChannel()
1875 pending = {}
1876 for digest, dest in options.file:
1877 pending[digest] = dest
1878 storage.async_fetch(
1879 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001880 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001881 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001882 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001883 functools.partial(file_write, os.path.join(options.target, dest)))
1884 while pending:
1885 fetched = channel.pull()
1886 dest = pending.pop(fetched)
1887 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001888
Vadim Shtayura3172be52013-12-03 12:49:05 -08001889 # Fetching whole isolated tree.
1890 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001891 with cache:
1892 bundle = fetch_isolated(
1893 isolated_hash=options.isolated,
1894 storage=storage,
1895 cache=cache,
maruel4409e302016-07-19 14:25:51 -07001896 outdir=options.target,
1897 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001898 if bundle.command:
1899 rel = os.path.join(options.target, bundle.relative_cwd)
1900 print('To run this test please run from the directory %s:' %
1901 os.path.join(options.target, rel))
1902 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001903
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001904 return 0
1905
1906
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001907def add_archive_options(parser):
1908 parser.add_option(
1909 '--blacklist',
1910 action='append', default=list(DEFAULT_BLACKLIST),
1911 help='List of regexp to use as blacklist filter when uploading '
1912 'directories')
1913
1914
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001915def add_isolate_server_options(parser):
1916 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001917 parser.add_option(
1918 '-I', '--isolate-server',
1919 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001920 help='URL of the Isolate Server to use. Defaults to the environment '
1921 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1922 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001923 parser.add_option(
aludwin81178302016-11-30 17:18:49 -08001924 '--is-grpc', action='store_true', help='Communicate to Isolate via gRPC')
1925 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001926 '--namespace', default='default-gzip',
1927 help='The namespace to use on the Isolate Server, default: %default')
1928
1929
nodir55be77b2016-05-03 09:39:57 -07001930def process_isolate_server_options(
1931 parser, options, set_exception_handler, required):
1932 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001933
1934 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001935 """
1936 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001937 if required:
1938 parser.error('--isolate-server is required.')
1939 return
1940
aludwin81178302016-11-30 17:18:49 -08001941 if options.is_grpc:
1942 isolate_storage.set_storage_api_class(isolate_storage.IsolateServerGrpc)
1943 else:
1944 try:
1945 options.isolate_server = net.fix_url(options.isolate_server)
1946 except ValueError as e:
1947 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001948 if set_exception_handler:
1949 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001950 try:
1951 return auth.ensure_logged_in(options.isolate_server)
1952 except ValueError as e:
1953 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001954
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001955
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001956def add_cache_options(parser):
1957 cache_group = optparse.OptionGroup(parser, 'Cache management')
1958 cache_group.add_option(
1959 '--cache', metavar='DIR',
1960 help='Directory to keep a local cache of the files. Accelerates download '
1961 'by reusing already downloaded files. Default=%default')
1962 cache_group.add_option(
1963 '--max-cache-size',
1964 type='int',
1965 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001966 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001967 help='Trim if the cache gets larger than this value, default=%default')
1968 cache_group.add_option(
1969 '--min-free-space',
1970 type='int',
1971 metavar='NNN',
1972 default=2*1024*1024*1024,
1973 help='Trim if disk free space becomes lower than this value, '
1974 'default=%default')
1975 cache_group.add_option(
1976 '--max-items',
1977 type='int',
1978 metavar='NNN',
1979 default=100000,
1980 help='Trim if more than this number of items are in the cache '
1981 'default=%default')
1982 parser.add_option_group(cache_group)
1983
1984
nodirf33b8d62016-10-26 22:34:58 -07001985def process_cache_options(options, trim=True):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001986 if options.cache:
1987 policies = CachePolicies(
1988 options.max_cache_size, options.min_free_space, options.max_items)
1989
1990 # |options.cache| path may not exist until DiskCache() instance is created.
1991 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001992 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001993 policies,
nodirf33b8d62016-10-26 22:34:58 -07001994 isolated_format.get_hash_algo(options.namespace),
1995 trim=trim)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001996 else:
1997 return MemoryCache()
1998
1999
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002000class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002001 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002002 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002003 self,
2004 version=__version__,
2005 prog=os.path.basename(sys.modules[__name__].__file__),
2006 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002007 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002008
2009 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002010 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002011 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002012 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002013 return options, args
2014
2015
2016def main(args):
2017 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002018 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002019
2020
2021if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002022 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002023 fix_encoding.fix_encoding()
2024 tools.disable_buffering()
2025 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002026 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002027 sys.exit(main(sys.argv[1:]))