blob: 8a44a054c51554c301ba24e189f62236a7d80282 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000016import Queue
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
tansell26de79e2016-11-13 18:41:11 -080021import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050022import tempfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000023import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000025import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000026
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027from third_party import colorama
28from third_party.depot_tools import fix_encoding
29from third_party.depot_tools import subcommand
30
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050031from utils import file_path
maruel12e30012015-10-09 11:55:35 -070032from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040033from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070036from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000037from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000038from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080040import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040041import isolated_format
aludwin81178302016-11-30 17:18:49 -080042import isolate_storage
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -040043import local_caching
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# Maximum expected delay (in seconds) between successive file fetches or uploads
51# in Storage. If it takes longer than that, a deadlock might be happening
52# and all stack frames for all threads are dumped to log.
53DEADLOCK_TIMEOUT = 5 * 60
54
55
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000056# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000057# All files are sorted by likelihood of a change in the file content
58# (currently file size is used to estimate this: larger the file -> larger the
59# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# and so on. Numbers here is a trade-off; the more per request, the lower the
62# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
63# larger values cause longer lookups, increasing the initial latency to start
64# uploading, which is especially an issue for large files. This value is
65# optimized for the "few thousands files to look up with minimal number of large
66# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040067ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000068
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000069
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000070# A list of already compressed extension types that should not receive any
71# compression before being uploaded.
72ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040073 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
74 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075]
76
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000077
maruel@chromium.org41601642013-09-18 19:40:46 +000078# The delay (in seconds) to wait between logging statements when retrieving
79# the required files. This is intended to let the user (or buildbot) know that
80# the program is still running.
81DELAY_BETWEEN_UPDATES_IN_SECS = 30
82
83
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050084DEFAULT_BLACKLIST = (
85 # Temporary vim or python files.
86 r'^.+\.(?:pyc|swp)$',
87 # .git or .svn directory.
88 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
89)
90
91
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050092class Error(Exception):
93 """Generic runtime error."""
94 pass
95
96
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040097class Aborted(Error):
98 """Operation aborted."""
99 pass
100
101
nodir90bc8dc2016-06-15 13:35:21 -0700102class AlreadyExists(Error):
103 """File already exists."""
104
105
maruel12e30012015-10-09 11:55:35 -0700106def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800107 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700108 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800109 if offset:
110 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000111 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000112 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000113 if not data:
114 break
115 yield data
116
117
tansell9e04a8d2016-07-28 09:31:59 -0700118def fileobj_path(fileobj):
119 """Return file system path for file like object or None.
120
121 The returned path is guaranteed to exist and can be passed to file system
122 operations like copy.
123 """
124 name = getattr(fileobj, 'name', None)
125 if name is None:
126 return
127
128 # If the file like object was created using something like open("test.txt")
129 # name will end up being a str (such as a function outside our control, like
130 # the standard library). We want all our paths to be unicode objects, so we
131 # decode it.
132 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500133 # We incorrectly assume that UTF-8 is used everywhere.
134 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700135
tansell26de79e2016-11-13 18:41:11 -0800136 # fs.exists requires an absolute path, otherwise it will fail with an
137 # assertion error.
138 if not os.path.isabs(name):
139 return
140
tansell9e04a8d2016-07-28 09:31:59 -0700141 if fs.exists(name):
142 return name
143
144
145# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
146# wrappers have been created.
147def fileobj_copy(
148 dstfileobj, srcfileobj, size=-1,
149 chunk_size=isolated_format.DISK_FILE_CHUNK):
150 """Copy data from srcfileobj to dstfileobj.
151
152 Providing size means exactly that amount of data will be copied (if there
153 isn't enough data, an IOError exception is thrown). Otherwise all data until
154 the EOF marker will be copied.
155 """
156 if size == -1 and hasattr(srcfileobj, 'tell'):
157 if srcfileobj.tell() != 0:
158 raise IOError('partial file but not using size')
159
160 written = 0
161 while written != size:
162 readsize = chunk_size
163 if size > 0:
164 readsize = min(readsize, size-written)
165 data = srcfileobj.read(readsize)
166 if not data:
167 if size == -1:
168 break
169 raise IOError('partial file, got %s, wanted %s' % (written, size))
170 dstfileobj.write(data)
171 written += len(data)
172
173
174def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
175 """Put srcfileobj at the given dstpath with given mode.
176
177 The function aims to do this as efficiently as possible while still allowing
178 any possible file like object be given.
179
180 Creating a tree of hardlinks has a few drawbacks:
181 - tmpfs cannot be used for the scratch space. The tree has to be on the same
182 partition as the cache.
183 - involves a write to the inode, which advances ctime, cause a metadata
184 writeback (causing disk seeking).
185 - cache ctime cannot be used to detect modifications / corruption.
186 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
187 partition. This is why the function automatically fallbacks to copying the
188 file content.
189 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
190 same owner is for all hardlinks.
191 - Anecdotal report that ext2 is known to be potentially faulty on high rate
192 of hardlink creation.
193
194 Creating a tree of symlinks has a few drawbacks:
195 - Tasks running the equivalent of os.path.realpath() will get the naked path
196 and may fail.
197 - Windows:
198 - Symlinks are reparse points:
199 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
200 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
201 - Symbolic links are Win32 paths, not NT paths.
202 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
203 - Symbolic links are supported on Windows 7 and later only.
204 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
205 default.
206 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
207 RID is present in the token;
208 https://msdn.microsoft.com/en-us/library/bb530410.aspx
209 """
210 srcpath = fileobj_path(srcfileobj)
211 if srcpath and size == -1:
212 readonly = file_mode is None or (
213 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
214
215 if readonly:
216 # If the file is read only we can link the file
217 if use_symlink:
218 link_mode = file_path.SYMLINK_WITH_FALLBACK
219 else:
220 link_mode = file_path.HARDLINK_WITH_FALLBACK
221 else:
222 # If not read only, we must copy the file
223 link_mode = file_path.COPY
224
225 file_path.link_file(dstpath, srcpath, link_mode)
226 else:
227 # Need to write out the file
228 with fs.open(dstpath, 'wb') as dstfileobj:
229 fileobj_copy(dstfileobj, srcfileobj, size)
230
231 assert fs.exists(dstpath)
232
233 # file_mode of 0 is actually valid, so need explicit check.
234 if file_mode is not None:
235 fs.chmod(dstpath, file_mode)
236
237
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000238def zip_compress(content_generator, level=7):
239 """Reads chunks from |content_generator| and yields zip compressed chunks."""
240 compressor = zlib.compressobj(level)
241 for chunk in content_generator:
242 compressed = compressor.compress(chunk)
243 if compressed:
244 yield compressed
245 tail = compressor.flush(zlib.Z_FINISH)
246 if tail:
247 yield tail
248
249
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400250def zip_decompress(
251 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000252 """Reads zipped data from |content_generator| and yields decompressed data.
253
254 Decompresses data in small chunks (no larger than |chunk_size|) so that
255 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
256
257 Raises IOError if data is corrupted or incomplete.
258 """
259 decompressor = zlib.decompressobj()
260 compressed_size = 0
261 try:
262 for chunk in content_generator:
263 compressed_size += len(chunk)
264 data = decompressor.decompress(chunk, chunk_size)
265 if data:
266 yield data
267 while decompressor.unconsumed_tail:
268 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
269 if data:
270 yield data
271 tail = decompressor.flush()
272 if tail:
273 yield tail
274 except zlib.error as e:
275 raise IOError(
276 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
277 # Ensure all data was read and decompressed.
278 if decompressor.unused_data or decompressor.unconsumed_tail:
279 raise IOError('Not all data was decompressed')
280
281
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000282def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000283 """Given a filename calculates the ideal zip compression level to use."""
284 file_ext = os.path.splitext(filename)[1].lower()
285 # TODO(csharp): Profile to find what compression level works best.
286 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
287
288
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000289def create_directories(base_directory, files):
290 """Creates the directory structure needed by the given list of files."""
291 logging.debug('create_directories(%s, %d)', base_directory, len(files))
292 # Creates the tree of directories to create.
293 directories = set(os.path.dirname(f) for f in files)
294 for item in list(directories):
295 while item:
296 directories.add(item)
297 item = os.path.dirname(item)
298 for d in sorted(directories):
299 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700300 abs_d = os.path.join(base_directory, d)
301 if not fs.isdir(abs_d):
302 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000303
304
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000305def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500306 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000307 for filepath, properties in files:
308 if 'l' not in properties:
309 continue
310 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500311 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000312 logging.warning('Ignoring symlink %s', filepath)
313 continue
314 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700315 try:
316 os.symlink(properties['l'], outfile) # pylint: disable=E1101
317 except OSError as e:
318 if e.errno == errno.EEXIST:
319 raise AlreadyExists('File %s already exists.' % outfile)
320 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000321
322
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400323class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000325
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800326 Its digest and size may be provided in advance, if known. Otherwise they will
327 be derived from the file content.
328 """
329
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000330 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800331 super(FileItem, self).__init__(
332 digest,
maruel12e30012015-10-09 11:55:35 -0700333 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000334 high_priority,
335 compression_level=_get_zip_compression_level(path))
336 self._path = path
337 self._algo = algo
338 self._meta = None
339
340 @property
341 def path(self):
342 return self._path
343
344 @property
345 def digest(self):
346 if not self._digest:
347 self._digest = isolated_format.hash_file(self._path, self._algo)
348 return self._digest
349
350 @property
351 def meta(self):
352 if not self._meta:
353 # TODO(maruel): Inline.
354 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
355 # We need to hash right away.
356 self._meta['h'] = self.digest
357 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000358
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800359 def content(self):
360 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000361
362
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400363class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000364 """A byte buffer to push to Storage."""
365
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000366 def __init__(self, buf, algo, high_priority=False):
367 super(BufferItem, self).__init__(
368 digest=algo(buf).hexdigest(),
369 size=len(buf),
370 high_priority=high_priority)
371 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000372
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800373 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000374 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000375
376
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000377class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800378 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000379
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800380 Implements compression support, parallel 'contains' checks, parallel uploads
381 and more.
382
383 Works only within single namespace (and thus hashing algorithm and compression
384 scheme are fixed).
385
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400386 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
387 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800388 """
389
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700390 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000391 self._storage_api = storage_api
392 self._cpu_thread_pool = None
393 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400394 self._aborted = False
395 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000398 def server_ref(self):
399 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700400
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000401 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700402 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000403 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700404
405 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000406 def cpu_thread_pool(self):
407 """ThreadPool for CPU-bound tasks like zipping."""
408 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500409 threads = max(threading_utils.num_processors(), 2)
410 if sys.maxsize <= 2L**32:
411 # On 32 bits userland, do not try to use more than 16 threads.
412 threads = min(threads, 16)
413 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000414 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000415
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000416 @property
417 def net_thread_pool(self):
418 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
419 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700420 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000421 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000422
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000423 def close(self):
424 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400425 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000426 if self._cpu_thread_pool:
427 self._cpu_thread_pool.join()
428 self._cpu_thread_pool.close()
429 self._cpu_thread_pool = None
430 if self._net_thread_pool:
431 self._net_thread_pool.join()
432 self._net_thread_pool.close()
433 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400434 logging.info('Done.')
435
436 def abort(self):
437 """Cancels any pending or future operations."""
438 # This is not strictly theadsafe, but in the worst case the logging message
439 # will be printed twice. Not a big deal. In other places it is assumed that
440 # unprotected reads and writes to _aborted are serializable (it is true
441 # for python) and thus no locking is used.
442 if not self._aborted:
443 logging.warning('Aborting... It can take a while.')
444 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000445
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000446 def __enter__(self):
447 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400448 assert not self._prev_sig_handlers, self._prev_sig_handlers
449 for s in (signal.SIGINT, signal.SIGTERM):
450 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000451 return self
452
453 def __exit__(self, _exc_type, _exc_value, _traceback):
454 """Context manager interface."""
455 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400456 while self._prev_sig_handlers:
457 s, h = self._prev_sig_handlers.popitem()
458 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 return False
460
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000461 def upload_items(self, items):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000462 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000463
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800464 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000465
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000466 It uses 3 threads internally:
467 - One to create batches based on a timeout
468 - One to dispatch the /contains RPC and field the missing entries
469 - One to field the /push RPC
470
471 The main threads enumerates 'items' and pushes to the first thread. Then it
472 join() all the threads, waiting for them to complete.
473
474 (enumerate items of Item, this can be slow as disk is traversed)
475 |
476 v
477 _create_items_batches_thread Thread #1
478 (generates list(Item), every 3s or 20~100 items)
479 |
480 v
481 _do_lookups_thread Thread #2
482 | |
483 v v
484 (missing) (was on server)
485 |
486 v
487 _handle_missing_thread Thread #3
488 |
489 v
490 (upload Item, append to uploaded)
491
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000492 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400493 items: list of isolate_storage.Item instances that represents data to
494 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000495
496 Returns:
497 List of items that were uploaded. All other items are already there.
498 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000499 incoming = Queue.Queue()
500 batches_to_lookup = Queue.Queue()
501 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000502 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800503
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000504 def _create_items_batches_thread():
505 """Creates batches for /contains RPC lookup from individual items.
506
507 Input: incoming
508 Output: batches_to_lookup
509 """
510 try:
511 batch_size_index = 0
512 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
513 batch = []
514 while not self._aborted:
515 try:
516 item = incoming.get(True, timeout=3)
517 if item:
518 batch.append(item)
519 except Queue.Empty:
520 item = False
521 if len(batch) == batch_size or (not item and batch):
522 if len(batch) == batch_size:
523 batch_size_index += 1
524 batch_size = ITEMS_PER_CONTAINS_QUERIES[
525 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
526 batches_to_lookup.put(batch)
527 batch = []
528 if item is None:
529 break
530 finally:
531 # Unblock the next pipeline.
532 batches_to_lookup.put(None)
533
534 def _do_lookups_thread():
535 """Enqueues all the /contains RPCs and emits the missing items.
536
537 Input: batches_to_lookup
538 Output: missing, to_upload
539 """
540 try:
541 channel = threading_utils.TaskChannel()
542 def _contains(b):
543 if self._aborted:
544 raise Aborted()
545 return self._storage_api.contains(b)
546
547 pending_contains = 0
548 while not self._aborted:
549 batch = batches_to_lookup.get()
550 if batch is None:
551 break
552 self.net_thread_pool.add_task_with_channel(
553 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
554 pending_contains += 1
555 while pending_contains and not self._aborted:
556 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000557 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000558 except threading_utils.TaskChannel.Timeout:
559 break
560 pending_contains -= 1
561 for missing_item, push_state in v.iteritems():
562 missing.put((missing_item, push_state))
563 while pending_contains and not self._aborted:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000564 for missing_item, push_state in channel.next().iteritems():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000565 missing.put((missing_item, push_state))
566 pending_contains -= 1
567 finally:
568 # Unblock the next pipeline.
569 missing.put((None, None))
570
571 def _handle_missing_thread():
572 """Sends the missing items to the uploader.
573
574 Input: missing
575 Output: uploaded
576 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700577 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000578 channel = threading_utils.TaskChannel()
579 pending_upload = 0
580 while not self._aborted:
581 try:
582 missing_item, push_state = missing.get(True, timeout=5)
583 if missing_item is None:
584 break
585 self._async_push(channel, missing_item, push_state)
586 pending_upload += 1
587 except Queue.Empty:
588 pass
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000589 detector.ping()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000590 while not self._aborted and pending_upload:
591 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000592 item = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000593 except threading_utils.TaskChannel.Timeout:
594 break
595 uploaded.append(item)
596 pending_upload -= 1
597 logging.debug(
598 'Uploaded %d; %d pending: %s (%d)',
599 len(uploaded), pending_upload, item.digest, item.size)
600 while not self._aborted and pending_upload:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000601 item = channel.next()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000602 uploaded.append(item)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000603 pending_upload -= 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000604 logging.debug(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000605 'Uploaded %d; %d pending: %s (%d)',
606 len(uploaded), pending_upload, item.digest, item.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000607
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000608 threads = [
609 threading.Thread(target=_create_items_batches_thread),
610 threading.Thread(target=_do_lookups_thread),
611 threading.Thread(target=_handle_missing_thread),
612 ]
613 for t in threads:
614 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000616 try:
617 # For each digest keep only first isolate_storage.Item that matches it.
618 # All other items are just indistinguishable copies from the point of view
619 # of isolate server (it doesn't care about paths at all, only content and
620 # digests).
621 seen = {}
622 try:
623 # TODO(maruel): Reorder the items as a priority queue, with larger items
624 # being processed first. This is, before hashing the data.
625 # This must be done in the primary thread since items can be a
626 # generator.
627 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000628 if seen.setdefault(item.digest, item) is item:
629 incoming.put(item)
630 finally:
631 incoming.put(None)
632 finally:
633 for t in threads:
634 t.join()
635
636 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000637 if seen:
638 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000639 return uploaded
640
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000641 def _async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000642 """Starts asynchronous push to the server in a parallel thread.
643
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000644 Can be used only after |item| was checked for presence on a server with a
645 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800646
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000647 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000648 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400649 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000650 push_state: push state returned by storage_api.contains(). It contains
651 storage specific information describing how to upload the item (for
652 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800653
654 Returns:
655 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000656 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800657 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400658 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700659 threading_utils.PRIORITY_HIGH if item.high_priority
660 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800661
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000662 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400663 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400664 if self._aborted:
665 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800666 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000667 return item
668
Wei Huang1a38fbe2017-11-28 22:55:22 -0500669 # If zipping is not required, just start a push task. Don't pass 'content'
670 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000671 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000672 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000673 return
674
675 # If zipping is enabled, zip in a separate thread.
676 def zip_and_push():
677 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
678 # content right here. It will block until all file is zipped.
679 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400680 if self._aborted:
681 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800682 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000683 data = ''.join(stream)
684 except Exception as exc:
685 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800686 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000687 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500688 # Pass '[data]' explicitly because the compressed data is not same as the
689 # one provided by 'item'. Since '[data]' is a list, it can safely be
690 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000691 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000692 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000693 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000694
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800695 def push(self, item, push_state):
696 """Synchronously pushes a single item to the server.
697
698 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000699 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800700
701 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400702 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000703 push_state: push state returned by storage_api.contains(). It contains
704 storage specific information describing how to upload the item (for
705 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800706
707 Returns:
708 Pushed item (same object as |item|).
709 """
710 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700711 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000712 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000713 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800714 assert pushed is item
715 return item
716
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000717 def async_fetch(self, channel, priority, digest, size, sink):
718 """Starts asynchronous fetch from the server in a parallel thread.
719
720 Arguments:
721 channel: TaskChannel that receives back |digest| when download ends.
722 priority: thread pool task priority for the fetch.
723 digest: hex digest of an item to download.
724 size: expected size of the item (after decompression).
725 sink: function that will be called as sink(generator).
726 """
727 def fetch():
728 try:
729 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400730 stream = self._storage_api.fetch(digest, size, 0)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000731 if self.server_ref.is_with_compression:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400732 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000733 # Run |stream| through verifier that will assert its size.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000734 verifier = FetchStreamVerifier(
735 stream, self.server_ref.hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000736 # Verified stream goes to |sink|.
737 sink(verifier.run())
738 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800739 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000740 raise
741 return digest
742
743 # Don't bother with zip_thread_pool for decompression. Decompression is
744 # really fast and most probably IO bound anyway.
745 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
746
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000747
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000748class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400749 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000750
751 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400752 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
753 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000754 """
755
756 def __init__(self, storage, cache):
757 self.storage = storage
758 self.cache = cache
759 self._channel = threading_utils.TaskChannel()
760 self._pending = set()
761 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000762 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400763 # Pending digests that the caller waits for, see wait_on()/wait().
764 self._waiting_on = set()
765 # Already fetched digests the caller waits for which are not yet returned by
766 # wait().
767 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000768
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400769 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700770 self,
771 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400772 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700773 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000774 """Starts asynchronous fetch of item |digest|."""
775 # Fetching it now?
776 if digest in self._pending:
777 return
778
779 # Mark this file as in use, verify_all_cached will later ensure it is still
780 # in cache.
781 self._accessed.add(digest)
782
783 # Already fetched? Notify cache to update item's LRU position.
784 if digest in self._fetched:
785 # 'touch' returns True if item is in cache and not corrupted.
786 if self.cache.touch(digest, size):
787 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000788 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000789 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000790
791 # TODO(maruel): It should look at the free disk space, the current cache
792 # size and the size of the new item on every new item:
793 # - Trim the cache as more entries are listed when free disk space is low,
794 # otherwise if the amount of data downloaded during the run > free disk
795 # space, it'll crash.
796 # - Make sure there's enough free disk space to fit all dependencies of
797 # this run! If not, abort early.
798
799 # Start fetching.
800 self._pending.add(digest)
801 self.storage.async_fetch(
802 self._channel, priority, digest, size,
803 functools.partial(self.cache.write, digest))
804
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400805 def wait_on(self, digest):
806 """Updates digests to be waited on by 'wait'."""
807 # Calculate once the already fetched items. These will be retrieved first.
808 if digest in self._fetched:
809 self._waiting_on_ready.add(digest)
810 else:
811 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000812
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400813 def wait(self):
814 """Waits until any of waited-on items is retrieved.
815
816 Once this happens, it is remove from the waited-on set and returned.
817
818 This function is called in two waves. The first wave it is done for HIGH
819 priority items, the isolated files themselves. The second wave it is called
820 for all the files.
821
822 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000823 """
824 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400825 if self._waiting_on_ready:
826 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000827
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400828 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000829
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400830 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000831 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000832 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000833 self._pending.remove(digest)
834 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400835 if digest in self._waiting_on:
836 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000837 return digest
838
839 # Should never reach this point due to assert above.
840 raise RuntimeError('Impossible state')
841
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400842 @property
843 def wait_queue_empty(self):
844 """Returns True if there is no digest left for wait() to return."""
845 return not self._waiting_on and not self._waiting_on_ready
846
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000847 def inject_local_file(self, path, algo):
848 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700849 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000850 data = f.read()
851 digest = algo(data).hexdigest()
852 self.cache.write(digest, [data])
853 self._fetched.add(digest)
854 return digest
855
856 @property
857 def pending_count(self):
858 """Returns number of items to be fetched."""
859 return len(self._pending)
860
861 def verify_all_cached(self):
862 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000863 # Not thread safe, but called after all work is done.
864 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000865
866
867class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400868 """Verifies that fetched file is valid before passing it to the
869 ContentAddressedCache.
870 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000871
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400872 def __init__(self, stream, hasher, expected_digest, expected_size):
873 """Initializes the verifier.
874
875 Arguments:
876 * stream: an iterable yielding chunks of content
877 * hasher: an object from hashlib that supports update() and hexdigest()
878 (eg, hashlib.sha1).
879 * expected_digest: if the entire stream is piped through hasher and then
880 summarized via hexdigest(), this should be the result. That is, it
881 should be a hex string like 'abc123'.
882 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400883 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400884 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400885 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000886 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400887 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000888 self.expected_size = expected_size
889 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400890 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000891
892 def run(self):
893 """Generator that yields same items as |stream|.
894
895 Verifies |stream| is complete before yielding a last chunk to consumer.
896
897 Also wraps IOError produced by consumer into MappingError exceptions since
898 otherwise Storage will retry fetch on unrelated local cache errors.
899 """
900 # Read one chunk ahead, keep it in |stored|.
901 # That way a complete stream can be verified before pushing last chunk
902 # to consumer.
903 stored = None
904 for chunk in self.stream:
905 assert chunk is not None
906 if stored is not None:
907 self._inspect_chunk(stored, is_last=False)
908 try:
909 yield stored
910 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400911 raise isolated_format.MappingError(
912 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000913 stored = chunk
914 if stored is not None:
915 self._inspect_chunk(stored, is_last=True)
916 try:
917 yield stored
918 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400919 raise isolated_format.MappingError(
920 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000921
922 def _inspect_chunk(self, chunk, is_last):
923 """Called for each fetched chunk before passing it to consumer."""
924 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400925 self.rolling_hash.update(chunk)
926 if not is_last:
927 return
928
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400929 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000930 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400931 msg = 'Incorrect file size: want %d, got %d' % (
932 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400933 raise IOError(msg)
934
935 actual_digest = self.rolling_hash.hexdigest()
936 if self.expected_digest != actual_digest:
937 msg = 'Incorrect digest: want %s, got %s' % (
938 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400939 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000940
941
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700942class IsolatedBundle(object):
943 """Fetched and parsed .isolated file with all dependencies."""
944
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000945 def __init__(self, filter_cb):
946 """
947 filter_cb: callback function to filter downloaded content.
948 When filter_cb is not None, Isolated file is downloaded iff
949 filter_cb(filepath) returns True.
950 """
951
Vadim Shtayura3148e072014-09-02 18:51:52 -0700952 self.command = []
953 self.files = {}
954 self.read_only = None
955 self.relative_cwd = None
956 # The main .isolated file, a IsolatedFile instance.
957 self.root = None
958
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000959 self._filter_cb = filter_cb
960
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700961 def fetch(self, fetch_queue, root_isolated_hash, algo):
962 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700963
964 It enables support for "included" .isolated files. They are processed in
965 strict order but fetched asynchronously from the cache. This is important so
966 that a file in an included .isolated file that is overridden by an embedding
967 .isolated file is not fetched needlessly. The includes are fetched in one
968 pass and the files are fetched as soon as all the ones on the left-side
969 of the tree were fetched.
970
971 The prioritization is very important here for nested .isolated files.
972 'includes' have the highest priority and the algorithm is optimized for both
973 deep and wide trees. A deep one is a long link of .isolated files referenced
974 one at a time by one item in 'includes'. A wide one has a large number of
975 'includes' in a single .isolated file. 'left' is defined as an included
976 .isolated file earlier in the 'includes' list. So the order of the elements
977 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700978
979 As a side effect this method starts asynchronous fetch of all data files
980 by adding them to |fetch_queue|. It doesn't wait for data files to finish
981 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700982 """
983 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
984
985 # Isolated files being retrieved now: hash -> IsolatedFile instance.
986 pending = {}
987 # Set of hashes of already retrieved items to refuse recursive includes.
988 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700989 # Set of IsolatedFile's whose data files have already being fetched.
990 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700991
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700992 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400993 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -0700994 h = isolated_file.obj_hash
995 if h in seen:
996 raise isolated_format.IsolatedError(
997 'IsolatedFile %s is retrieved recursively' % h)
998 assert h not in pending
999 seen.add(h)
1000 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001001 # This isolated item is being added dynamically, notify FetchQueue.
1002 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001003 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1004
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001005 # Start fetching root *.isolated file (single file, not the whole bundle).
1006 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001007
1008 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001009 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001010 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001011 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001012 with fetch_queue.cache.getfileobj(item_hash) as f:
1013 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001014
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001015 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001016 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001017 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001018
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001019 # Always fetch *.isolated files in traversal order, waiting if necessary
1020 # until next to-be-processed node loads. "Waiting" is done by yielding
1021 # back to the outer loop, that waits until some *.isolated is loaded.
1022 for node in isolated_format.walk_includes(self.root):
1023 if node not in processed:
1024 # Not visited, and not yet loaded -> wait for it to load.
1025 if not node.is_loaded:
1026 break
1027 # Not visited and loaded -> process it and continue the traversal.
1028 self._start_fetching_files(node, fetch_queue)
1029 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001030
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001031 # All *.isolated files should be processed by now and only them.
1032 all_isolateds = set(isolated_format.walk_includes(self.root))
1033 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001034 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001035
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001036 # Extract 'command' and other bundle properties.
1037 for node in isolated_format.walk_includes(self.root):
1038 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001039 self.relative_cwd = self.relative_cwd or ''
1040
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001041 def _start_fetching_files(self, isolated, fetch_queue):
1042 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001043
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001044 Modifies self.files.
1045 """
maruel10bea7b2016-12-07 05:03:49 -08001046 files = isolated.data.get('files', {})
1047 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1048 for filepath, properties in files.iteritems():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001049 if self._filter_cb and not self._filter_cb(filepath):
1050 continue
1051
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001052 # Root isolated has priority on the files being mapped. In particular,
1053 # overridden files must not be fetched.
1054 if filepath not in self.files:
1055 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001056
1057 # Make sure if the isolated is read only, the mode doesn't have write
1058 # bits.
1059 if 'm' in properties and self.read_only:
1060 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1061
1062 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001063 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001064 fetch_queue.add(
1065 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1066
1067 def _update_self(self, node):
1068 """Extracts bundle global parameters from loaded *.isolated file.
1069
1070 Will be called with each loaded *.isolated file in order of traversal of
1071 isolated include graph (see isolated_format.walk_includes).
1072 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001073 # Grabs properties.
1074 if not self.command and node.data.get('command'):
1075 # Ensure paths are correctly separated on windows.
1076 self.command = node.data['command']
1077 if self.command:
1078 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001079 if self.read_only is None and node.data.get('read_only') is not None:
1080 self.read_only = node.data['read_only']
1081 if (self.relative_cwd is None and
1082 node.data.get('relative_cwd') is not None):
1083 self.relative_cwd = node.data['relative_cwd']
1084
1085
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001086def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001087 """Returns Storage class that can upload and download from |namespace|.
1088
1089 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001090 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001091
1092 Returns:
1093 Instance of Storage.
1094 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001095 assert isinstance(server_ref, isolate_storage.ServerRef), repr(server_ref)
1096 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001097
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001098
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001099def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1100 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001101 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001102
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001103 Arguments:
1104 isolated_hash: hash of the root *.isolated file.
1105 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001106 cache: ContentAddressedCache class that knows how to store and map files
1107 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001108 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001109 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001110 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001111
1112 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001113 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001114 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001115 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001116 'fetch_isolated(%s, %s, %s, %s, %s)',
1117 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001118 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001119 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001120 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001121 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001122
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001123 with tools.Profiler('GetIsolateds'):
1124 # Optionally support local files by manually adding them to cache.
1125 if not isolated_format.is_valid_hash(isolated_hash, algo):
1126 logging.debug('%s is not a valid hash, assuming a file '
1127 '(algo was %s, hash size was %d)',
1128 isolated_hash, algo(), algo().digest_size)
1129 path = unicode(os.path.abspath(isolated_hash))
1130 try:
1131 isolated_hash = fetch_queue.inject_local_file(path, algo)
1132 except IOError as e:
1133 raise isolated_format.MappingError(
1134 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1135 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001136
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001137 # Load all *.isolated and start loading rest of the files.
1138 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001139
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001140 with tools.Profiler('GetRest'):
1141 # Create file system hierarchy.
1142 file_path.ensure_tree(outdir)
1143 create_directories(outdir, bundle.files)
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +00001144 _create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001145
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001146 # Ensure working directory exists.
1147 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1148 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001149
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001150 # Multimap: digest -> list of pairs (path, props).
1151 remaining = {}
1152 for filepath, props in bundle.files.iteritems():
1153 if 'h' in props:
1154 remaining.setdefault(props['h'], []).append((filepath, props))
1155 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001156
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001157 # Now block on the remaining files to be downloaded and mapped.
1158 logging.info('Retrieving remaining files (%d of them)...',
1159 fetch_queue.pending_count)
1160 last_update = time.time()
1161 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1162 while remaining:
1163 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001164
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001165 # Wait for any item to finish fetching to cache.
1166 digest = fetch_queue.wait()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001167
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001168 # Create the files in the destination using item in cache as the
1169 # source.
1170 for filepath, props in remaining.pop(digest):
1171 fullpath = os.path.join(outdir, filepath)
tansell9e04a8d2016-07-28 09:31:59 -07001172
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001173 with cache.getfileobj(digest) as srcfileobj:
1174 filetype = props.get('t', 'basic')
tanselle4288c32016-07-28 09:45:40 -07001175
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001176 if filetype == 'basic':
1177 # Ignore all bits apart from the user.
1178 file_mode = (props.get('m') or 0500) & 0700
1179 if bundle.read_only:
1180 # Enforce read-only if the root bundle does.
1181 file_mode &= 0500
1182 putfile(
1183 srcfileobj, fullpath, file_mode,
1184 use_symlink=use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001185
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001186 elif filetype == 'tar':
1187 basedir = os.path.dirname(fullpath)
1188 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1189 for ti in t:
1190 if not ti.isfile():
1191 logging.warning(
1192 'Path(%r) is nonfile (%s), skipped',
1193 ti.name, ti.type)
1194 continue
1195 # Handle files created on Windows fetched on POSIX and the
1196 # reverse.
1197 other_sep = '/' if os.path.sep == '\\' else '\\'
1198 name = ti.name.replace(other_sep, os.path.sep)
1199 fp = os.path.normpath(os.path.join(basedir, name))
1200 if not fp.startswith(basedir):
1201 logging.error(
1202 'Path(%r) is outside root directory',
1203 fp)
1204 ifd = t.extractfile(ti)
1205 file_path.ensure_tree(os.path.dirname(fp))
1206 file_mode = ti.mode & 0700
1207 if bundle.read_only:
1208 # Enforce read-only if the root bundle does.
1209 file_mode &= 0500
1210 putfile(ifd, fp, file_mode, ti.size)
tansell26de79e2016-11-13 18:41:11 -08001211
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001212 else:
1213 raise isolated_format.IsolatedError(
1214 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001215
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001216 # Report progress.
1217 duration = time.time() - last_update
1218 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1219 msg = '%d files remaining...' % len(remaining)
1220 sys.stdout.write(msg + '\n')
1221 sys.stdout.flush()
1222 logging.info(msg)
1223 last_update = time.time()
1224 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001225
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001226 # Save the cache right away to not loose the state of the new objects.
1227 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001228 # Cache could evict some items we just tried to fetch, it's a fatal error.
1229 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001230 free_disk = file_path.get_free_space(cache.cache_dir)
1231 msg = (
1232 'Cache is too small to hold all requested files.\n'
1233 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001234 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001235 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001236 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001237
1238
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001239def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001240 """Yields every file and/or symlink found.
1241
1242 Yields:
1243 tuple(FileItem, relpath, metadata)
1244 For a symlink, FileItem is None.
1245 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001246 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001247 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001248 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001249 root,
1250 u'.' + os.path.sep,
1251 blacklist,
1252 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001253
1254 filepath = os.path.join(root, relpath)
1255 if issymlink:
1256 meta = isolated_format.file_to_metadata(filepath, 0, False)
1257 yield None, relpath, meta
1258 continue
1259
1260 prio = relpath.endswith('.isolated')
1261
1262 # Yield the file individually.
1263 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1264 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001265
1266
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001267def _print_upload_stats(items, missing):
1268 """Prints upload stats."""
1269 total = len(items)
1270 total_size = sum(f.size for f in items)
1271 logging.info(
1272 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1273 cache_hit = set(items).difference(missing)
1274 cache_hit_size = sum(f.size for f in cache_hit)
1275 logging.info(
1276 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1277 len(cache_hit),
1278 cache_hit_size / 1024.,
1279 len(cache_hit) * 100. / total,
1280 cache_hit_size * 100. / total_size if total_size else 0)
1281 cache_miss = missing
1282 cache_miss_size = sum(f.size for f in cache_miss)
1283 logging.info(
1284 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1285 len(cache_miss),
1286 cache_miss_size / 1024.,
1287 len(cache_miss) * 100. / total,
1288 cache_miss_size * 100. / total_size if total_size else 0)
1289
1290
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001291def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001292 """Called by archive_files_to_storage for a directory.
1293
1294 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001295
1296 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001297 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001298 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001299 files = {}
1300 for item, relpath, meta in _directory_to_metadata(
1301 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001302 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001303 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001304 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001305 yield item
1306
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001307 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001308 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001309 'algo': hash_algo_name,
1310 'files': files,
1311 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001312 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001313 # Keep the file in memory. This is fine because .isolated files are relatively
1314 # small.
1315 yield BufferItem(
1316 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001317
1318
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001319def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001320 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001321
1322 Arguments:
1323 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001324 files: iterable of files to upload. If a directory is specified (with a
1325 trailing slash), a .isolated file is created and its hash is returned.
1326 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001327 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001328
1329 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001330 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001331 The first file in the first item is always the .isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001332 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001333 # Dict of path to hash.
1334 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001335 hash_algo = storage.server_ref.hash_algo
1336 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001337 # Generator of FileItem to pass to upload_items() concurrent operation.
1338 channel = threading_utils.TaskChannel()
1339 uploaded_digests = set()
1340 def _upload_items():
1341 results = storage.upload_items(channel)
1342 uploaded_digests.update(f.digest for f in results)
1343 t = threading.Thread(target=_upload_items)
1344 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001345
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001346 # Keep track locally of the items to determine cold and hot items.
1347 items_found = []
1348 try:
1349 for f in files:
1350 assert isinstance(f, unicode), repr(f)
1351 if f in results:
1352 # Duplicate
1353 continue
1354 try:
1355 filepath = os.path.abspath(f)
1356 if fs.isdir(filepath):
1357 # Uploading a whole directory.
1358 item = None
1359 for item in _enqueue_dir(
1360 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001361 channel.send_result(item)
1362 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001363 # The very last item will be the .isolated file.
1364 if not item:
1365 # There was no file in the directory.
1366 continue
1367 elif fs.isfile(filepath):
1368 item = FileItem(
1369 path=filepath,
1370 algo=hash_algo,
1371 size=None,
1372 high_priority=f.endswith('.isolated'))
1373 channel.send_result(item)
1374 items_found.append(item)
1375 else:
1376 raise Error('%s is neither a file or directory.' % f)
1377 results[f] = item.digest
1378 except OSError:
1379 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001380 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001381 # Stops the generator, so _upload_items() can exit.
1382 channel.send_done()
1383 t.join()
1384
1385 cold = []
1386 hot = []
1387 for i in items_found:
1388 # Note that multiple FileItem may have the same .digest.
1389 if i.digest in uploaded_digests:
1390 cold.append(i)
1391 else:
1392 hot.append(i)
1393 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001394
1395
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001396@subcommand.usage('<file1..fileN> or - to read from stdin')
1397def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001398 """Archives data to the server.
1399
1400 If a directory is specified, a .isolated file is created the whole directory
1401 is uploaded. Then this .isolated file can be included in another one to run
1402 commands.
1403
1404 The commands output each file that was processed with its content hash. For
1405 directories, the .isolated generated for the directory is listed as the
1406 directory entry itself.
1407 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001408 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001409 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001410 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001411 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001412 server_ref = isolate_storage.ServerRef(
1413 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001414 if files == ['-']:
1415 files = (l.rstrip('\n\r') for l in sys.stdin)
1416 if not files:
1417 parser.error('Nothing to upload')
1418 files = (f.decode('utf-8') for f in files)
1419 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001420 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001421 with get_storage(server_ref) as storage:
1422 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001423 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001424 parser.error(e.args[0])
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001425 print('\n'.join('%s %s' % (h, f) for f, h in results.iteritems()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001426 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001427
1428
1429def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001430 """Download data from the server.
1431
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001432 It can either download individual files or a complete tree from a .isolated
1433 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001434 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001435 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001436 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001437 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001438 help='hash of an isolated file, .isolated file content is discarded, use '
1439 '--file if you need it')
1440 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001441 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1442 help='hash and destination of a file, can be used multiple times')
1443 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001444 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001445 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001446 parser.add_option(
1447 '--use-symlinks', action='store_true',
1448 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001449 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001450 options, args = parser.parse_args(args)
1451 if args:
1452 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001453 if not file_path.enable_symlink():
1454 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001455
nodir55be77b2016-05-03 09:39:57 -07001456 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001457 if bool(options.isolated) == bool(options.file):
1458 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001459 if not options.cache and options.use_symlinks:
1460 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001461
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001462 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001463 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001464 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001465 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001466 if (fs.isfile(options.target) or
1467 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001468 parser.error(
1469 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001470 server_ref = isolate_storage.ServerRef(
1471 options.isolate_server, options.namespace)
1472 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001473 # Fetching individual files.
1474 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001475 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001476 channel = threading_utils.TaskChannel()
1477 pending = {}
1478 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001479 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001480 pending[digest] = dest
1481 storage.async_fetch(
1482 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001483 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001484 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001485 local_caching.UNKNOWN_FILE_SIZE,
1486 functools.partial(
1487 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001488 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001489 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001490 dest = pending.pop(fetched)
1491 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001492
Vadim Shtayura3172be52013-12-03 12:49:05 -08001493 # Fetching whole isolated tree.
1494 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001495 bundle = fetch_isolated(
1496 isolated_hash=options.isolated,
1497 storage=storage,
1498 cache=cache,
1499 outdir=options.target,
1500 use_symlinks=options.use_symlinks)
1501 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001502 if bundle.command:
1503 rel = os.path.join(options.target, bundle.relative_cwd)
1504 print('To run this test please run from the directory %s:' %
1505 os.path.join(options.target, rel))
1506 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001507
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001508 return 0
1509
1510
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001511def add_archive_options(parser):
1512 parser.add_option(
1513 '--blacklist',
1514 action='append', default=list(DEFAULT_BLACKLIST),
1515 help='List of regexp to use as blacklist filter when uploading '
1516 'directories')
1517
1518
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001519def add_isolate_server_options(parser):
1520 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001521 parser.add_option(
1522 '-I', '--isolate-server',
1523 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001524 help='URL of the Isolate Server to use. Defaults to the environment '
1525 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1526 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001527 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001528 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001529 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001530 '--namespace', default='default-gzip',
1531 help='The namespace to use on the Isolate Server, default: %default')
1532
1533
nodir55be77b2016-05-03 09:39:57 -07001534def process_isolate_server_options(
1535 parser, options, set_exception_handler, required):
1536 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001537
1538 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001539 """
1540 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001541 if required:
1542 parser.error('--isolate-server is required.')
1543 return
1544
aludwind7b7b7e2017-06-29 16:38:50 -07001545 if options.grpc_proxy:
1546 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001547 else:
1548 try:
1549 options.isolate_server = net.fix_url(options.isolate_server)
1550 except ValueError as e:
1551 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001552 if set_exception_handler:
1553 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001554 try:
1555 return auth.ensure_logged_in(options.isolate_server)
1556 except ValueError as e:
1557 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001558
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001559
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001560def add_cache_options(parser):
1561 cache_group = optparse.OptionGroup(parser, 'Cache management')
1562 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001563 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001564 help='Directory to keep a local cache of the files. Accelerates download '
1565 'by reusing already downloaded files. Default=%default')
1566 cache_group.add_option(
1567 '--max-cache-size',
1568 type='int',
1569 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001570 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001571 help='Trim if the cache gets larger than this value, default=%default')
1572 cache_group.add_option(
1573 '--min-free-space',
1574 type='int',
1575 metavar='NNN',
1576 default=2*1024*1024*1024,
1577 help='Trim if disk free space becomes lower than this value, '
1578 'default=%default')
1579 cache_group.add_option(
1580 '--max-items',
1581 type='int',
1582 metavar='NNN',
1583 default=100000,
1584 help='Trim if more than this number of items are in the cache '
1585 'default=%default')
1586 parser.add_option_group(cache_group)
1587
1588
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001589def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001590 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001591 policies = local_caching.CachePolicies(
1592 options.max_cache_size,
1593 options.min_free_space,
1594 options.max_items,
1595 # 3 weeks.
1596 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001597
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001598 # |options.cache| path may not exist until DiskContentAddressedCache()
1599 # instance is created.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001600 server_ref = isolate_storage.ServerRef(
1601 options.isolate_server, options.namespace)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001602 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001603 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001604 policies,
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001605 server_ref.hash_algo(), # pylint: disable=not-callable
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001606 trim,
maruele6fc9382017-05-04 09:03:48 -07001607 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001608 else:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001609 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001610
1611
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001612class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001613 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001614 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001615 self,
1616 version=__version__,
1617 prog=os.path.basename(sys.modules[__name__].__file__),
1618 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001619 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001620
1621 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001622 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001623 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001624 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001625 return options, args
1626
1627
1628def main(args):
1629 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001630 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001631
1632
1633if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001634 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001635 fix_encoding.fix_encoding()
1636 tools.disable_buffering()
1637 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001638 sys.exit(main(sys.argv[1:]))