blob: 00374babb1b50e94e5b12612767091e902674c9a [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
tansell9e04a8d2016-07-28 09:31:59 -07008__version__ = '0.6.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
13import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040015import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000022import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050024import types
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000025import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000026
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027from third_party import colorama
28from third_party.depot_tools import fix_encoding
29from third_party.depot_tools import subcommand
30
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050031from utils import file_path
maruel12e30012015-10-09 11:55:35 -070032from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040033from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040034from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000035from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040036from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070037from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000038from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000039from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000040
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080041import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040042import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080043
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000044
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045# Version of isolate protocol passed to the server in /handshake request.
46ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048
Vadim Shtayura3148e072014-09-02 18:51:52 -070049# The file size to be used when we don't know the correct file size,
50# generally used for .isolated files.
51UNKNOWN_FILE_SIZE = None
52
53
54# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000082# Chunk size to use when reading from network stream.
83NET_IO_FILE_CHUNK = 16 * 1024
84
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000085
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000086# Read timeout in seconds for downloads from isolate storage. If there's no
87# response from the server within this timeout whole download will be aborted.
88DOWNLOAD_READ_TIMEOUT = 60
89
90
maruel@chromium.org41601642013-09-18 19:40:46 +000091# The delay (in seconds) to wait between logging statements when retrieving
92# the required files. This is intended to let the user (or buildbot) know that
93# the program is still running.
94DELAY_BETWEEN_UPDATES_IN_SECS = 30
95
96
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050097DEFAULT_BLACKLIST = (
98 # Temporary vim or python files.
99 r'^.+\.(?:pyc|swp)$',
100 # .git or .svn directory.
101 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
102)
103
104
Vadim Shtayura8623c272014-12-01 11:45:27 -0800105# A class to use to communicate with the server by default. Can be changed by
106# 'set_storage_api_class'. Default is IsolateServer.
107_storage_api_cls = None
108
109
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500110class Error(Exception):
111 """Generic runtime error."""
112 pass
113
114
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400115class Aborted(Error):
116 """Operation aborted."""
117 pass
118
119
nodir90bc8dc2016-06-15 13:35:21 -0700120class AlreadyExists(Error):
121 """File already exists."""
122
123
maruel12e30012015-10-09 11:55:35 -0700124def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800125 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700126 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800127 if offset:
128 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000129 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000130 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000131 if not data:
132 break
133 yield data
134
135
maruel12e30012015-10-09 11:55:35 -0700136def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 """Writes file content as generated by content_generator.
138
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000139 Creates the intermediary directory as needed.
140
141 Returns the number of bytes written.
142
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 Meant to be mocked out in unit tests.
144 """
nodire5028a92016-04-29 14:38:21 -0700145 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000146 total = 0
maruel12e30012015-10-09 11:55:35 -0700147 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000148 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000149 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000150 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000152
153
tansell9e04a8d2016-07-28 09:31:59 -0700154def fileobj_path(fileobj):
155 """Return file system path for file like object or None.
156
157 The returned path is guaranteed to exist and can be passed to file system
158 operations like copy.
159 """
160 name = getattr(fileobj, 'name', None)
161 if name is None:
162 return
163
164 # If the file like object was created using something like open("test.txt")
165 # name will end up being a str (such as a function outside our control, like
166 # the standard library). We want all our paths to be unicode objects, so we
167 # decode it.
168 if not isinstance(name, unicode):
169 name = name.decode(sys.getfilesystemencoding())
170
171 if fs.exists(name):
172 return name
173
174
175# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
176# wrappers have been created.
177def fileobj_copy(
178 dstfileobj, srcfileobj, size=-1,
179 chunk_size=isolated_format.DISK_FILE_CHUNK):
180 """Copy data from srcfileobj to dstfileobj.
181
182 Providing size means exactly that amount of data will be copied (if there
183 isn't enough data, an IOError exception is thrown). Otherwise all data until
184 the EOF marker will be copied.
185 """
186 if size == -1 and hasattr(srcfileobj, 'tell'):
187 if srcfileobj.tell() != 0:
188 raise IOError('partial file but not using size')
189
190 written = 0
191 while written != size:
192 readsize = chunk_size
193 if size > 0:
194 readsize = min(readsize, size-written)
195 data = srcfileobj.read(readsize)
196 if not data:
197 if size == -1:
198 break
199 raise IOError('partial file, got %s, wanted %s' % (written, size))
200 dstfileobj.write(data)
201 written += len(data)
202
203
204def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
205 """Put srcfileobj at the given dstpath with given mode.
206
207 The function aims to do this as efficiently as possible while still allowing
208 any possible file like object be given.
209
210 Creating a tree of hardlinks has a few drawbacks:
211 - tmpfs cannot be used for the scratch space. The tree has to be on the same
212 partition as the cache.
213 - involves a write to the inode, which advances ctime, cause a metadata
214 writeback (causing disk seeking).
215 - cache ctime cannot be used to detect modifications / corruption.
216 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
217 partition. This is why the function automatically fallbacks to copying the
218 file content.
219 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
220 same owner is for all hardlinks.
221 - Anecdotal report that ext2 is known to be potentially faulty on high rate
222 of hardlink creation.
223
224 Creating a tree of symlinks has a few drawbacks:
225 - Tasks running the equivalent of os.path.realpath() will get the naked path
226 and may fail.
227 - Windows:
228 - Symlinks are reparse points:
229 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
230 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
231 - Symbolic links are Win32 paths, not NT paths.
232 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
233 - Symbolic links are supported on Windows 7 and later only.
234 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
235 default.
236 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
237 RID is present in the token;
238 https://msdn.microsoft.com/en-us/library/bb530410.aspx
239 """
240 srcpath = fileobj_path(srcfileobj)
241 if srcpath and size == -1:
242 readonly = file_mode is None or (
243 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
244
245 if readonly:
246 # If the file is read only we can link the file
247 if use_symlink:
248 link_mode = file_path.SYMLINK_WITH_FALLBACK
249 else:
250 link_mode = file_path.HARDLINK_WITH_FALLBACK
251 else:
252 # If not read only, we must copy the file
253 link_mode = file_path.COPY
254
255 file_path.link_file(dstpath, srcpath, link_mode)
256 else:
257 # Need to write out the file
258 with fs.open(dstpath, 'wb') as dstfileobj:
259 fileobj_copy(dstfileobj, srcfileobj, size)
260
261 assert fs.exists(dstpath)
262
263 # file_mode of 0 is actually valid, so need explicit check.
264 if file_mode is not None:
265 fs.chmod(dstpath, file_mode)
266
267
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000268def zip_compress(content_generator, level=7):
269 """Reads chunks from |content_generator| and yields zip compressed chunks."""
270 compressor = zlib.compressobj(level)
271 for chunk in content_generator:
272 compressed = compressor.compress(chunk)
273 if compressed:
274 yield compressed
275 tail = compressor.flush(zlib.Z_FINISH)
276 if tail:
277 yield tail
278
279
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400280def zip_decompress(
281 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000282 """Reads zipped data from |content_generator| and yields decompressed data.
283
284 Decompresses data in small chunks (no larger than |chunk_size|) so that
285 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
286
287 Raises IOError if data is corrupted or incomplete.
288 """
289 decompressor = zlib.decompressobj()
290 compressed_size = 0
291 try:
292 for chunk in content_generator:
293 compressed_size += len(chunk)
294 data = decompressor.decompress(chunk, chunk_size)
295 if data:
296 yield data
297 while decompressor.unconsumed_tail:
298 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
299 if data:
300 yield data
301 tail = decompressor.flush()
302 if tail:
303 yield tail
304 except zlib.error as e:
305 raise IOError(
306 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
307 # Ensure all data was read and decompressed.
308 if decompressor.unused_data or decompressor.unconsumed_tail:
309 raise IOError('Not all data was decompressed')
310
311
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000312def get_zip_compression_level(filename):
313 """Given a filename calculates the ideal zip compression level to use."""
314 file_ext = os.path.splitext(filename)[1].lower()
315 # TODO(csharp): Profile to find what compression level works best.
316 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
317
318
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000319def create_directories(base_directory, files):
320 """Creates the directory structure needed by the given list of files."""
321 logging.debug('create_directories(%s, %d)', base_directory, len(files))
322 # Creates the tree of directories to create.
323 directories = set(os.path.dirname(f) for f in files)
324 for item in list(directories):
325 while item:
326 directories.add(item)
327 item = os.path.dirname(item)
328 for d in sorted(directories):
329 if d:
maruel12e30012015-10-09 11:55:35 -0700330 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000331
332
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500333def create_symlinks(base_directory, files):
334 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000335 for filepath, properties in files:
336 if 'l' not in properties:
337 continue
338 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500339 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000340 logging.warning('Ignoring symlink %s', filepath)
341 continue
342 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700343 try:
344 os.symlink(properties['l'], outfile) # pylint: disable=E1101
345 except OSError as e:
346 if e.errno == errno.EEXIST:
347 raise AlreadyExists('File %s already exists.' % outfile)
348 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000349
350
maruel12e30012015-10-09 11:55:35 -0700351def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000352 """Determines if the given files appears valid.
353
354 Currently it just checks the file's size.
355 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700356 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700357 return fs.isfile(path)
358 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000359 if size != actual_size:
360 logging.warning(
361 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700362 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000363 return False
364 return True
365
366
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000367class Item(object):
368 """An item to push to Storage.
369
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 Its digest and size may be provided in advance, if known. Otherwise they will
371 be derived from content(). If digest is provided, it MUST correspond to
372 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000373
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800374 When used with Storage, Item starts its life in a main thread, travels
375 to 'contains' thread, then to 'push' thread and then finally back to
376 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000377 """
378
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800379 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000380 self.digest = digest
381 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000383 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000384
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800385 def content(self):
386 """Iterable with content of this item as byte string (str) chunks."""
387 raise NotImplementedError()
388
389 def prepare(self, hash_algo):
390 """Ensures self.digest and self.size are set.
391
392 Uses content() as a source of data to calculate them. Does nothing if digest
393 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000394
395 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800396 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000397 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800398 if self.digest is None or self.size is None:
399 digest = hash_algo()
400 total = 0
401 for chunk in self.content():
402 digest.update(chunk)
403 total += len(chunk)
404 self.digest = digest.hexdigest()
405 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000406
407
408class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800409 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000410
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800411 Its digest and size may be provided in advance, if known. Otherwise they will
412 be derived from the file content.
413 """
414
415 def __init__(self, path, digest=None, size=None, high_priority=False):
416 super(FileItem, self).__init__(
417 digest,
maruel12e30012015-10-09 11:55:35 -0700418 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800419 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000420 self.path = path
421 self.compression_level = get_zip_compression_level(path)
422
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800423 def content(self):
424 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000425
426
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000427class BufferItem(Item):
428 """A byte buffer to push to Storage."""
429
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800430 def __init__(self, buf, high_priority=False):
431 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000432 self.buffer = buf
433
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800434 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000435 return [self.buffer]
436
437
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000438class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800439 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000440
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800441 Implements compression support, parallel 'contains' checks, parallel uploads
442 and more.
443
444 Works only within single namespace (and thus hashing algorithm and compression
445 scheme are fixed).
446
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400447 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
448 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800449 """
450
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700451 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400453 self._use_zip = isolated_format.is_namespace_with_compression(
454 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400455 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000456 self._cpu_thread_pool = None
457 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400458 self._aborted = False
459 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000460
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700462 def hash_algo(self):
463 """Hashing algorithm used to name files in storage based on their content.
464
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400465 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700466 """
467 return self._hash_algo
468
469 @property
470 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500471 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700472 return self._storage_api.location
473
474 @property
475 def namespace(self):
476 """Isolate namespace used by this storage.
477
478 Indirectly defines hashing scheme and compression method used.
479 """
480 return self._storage_api.namespace
481
482 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000483 def cpu_thread_pool(self):
484 """ThreadPool for CPU-bound tasks like zipping."""
485 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500486 threads = max(threading_utils.num_processors(), 2)
487 if sys.maxsize <= 2L**32:
488 # On 32 bits userland, do not try to use more than 16 threads.
489 threads = min(threads, 16)
490 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000491 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000492
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000493 @property
494 def net_thread_pool(self):
495 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
496 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700497 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000498 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000499
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000500 def close(self):
501 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400502 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000503 if self._cpu_thread_pool:
504 self._cpu_thread_pool.join()
505 self._cpu_thread_pool.close()
506 self._cpu_thread_pool = None
507 if self._net_thread_pool:
508 self._net_thread_pool.join()
509 self._net_thread_pool.close()
510 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400511 logging.info('Done.')
512
513 def abort(self):
514 """Cancels any pending or future operations."""
515 # This is not strictly theadsafe, but in the worst case the logging message
516 # will be printed twice. Not a big deal. In other places it is assumed that
517 # unprotected reads and writes to _aborted are serializable (it is true
518 # for python) and thus no locking is used.
519 if not self._aborted:
520 logging.warning('Aborting... It can take a while.')
521 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000522
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000523 def __enter__(self):
524 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400525 assert not self._prev_sig_handlers, self._prev_sig_handlers
526 for s in (signal.SIGINT, signal.SIGTERM):
527 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000528 return self
529
530 def __exit__(self, _exc_type, _exc_value, _traceback):
531 """Context manager interface."""
532 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400533 while self._prev_sig_handlers:
534 s, h = self._prev_sig_handlers.popitem()
535 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000536 return False
537
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800539 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000540
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800541 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000542
543 Arguments:
544 items: list of Item instances that represents data to upload.
545
546 Returns:
547 List of items that were uploaded. All other items are already there.
548 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700549 logging.info('upload_items(items=%d)', len(items))
550
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800551 # Ensure all digests are calculated.
552 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700553 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800554
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000555 # For each digest keep only first Item that matches it. All other items
556 # are just indistinguishable copies from the point of view of isolate
557 # server (it doesn't care about paths at all, only content and digests).
558 seen = {}
559 duplicates = 0
560 for item in items:
561 if seen.setdefault(item.digest, item) is not item:
562 duplicates += 1
563 items = seen.values()
564 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700565 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000566
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000567 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000568 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000569 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800570 channel = threading_utils.TaskChannel()
571 for missing_item, push_state in self.get_missing_items(items):
572 missing.add(missing_item)
573 self.async_push(channel, missing_item, push_state)
574
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000575 # No need to spawn deadlock detector thread if there's nothing to upload.
576 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700577 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000578 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000579 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000580 detector.ping()
581 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000582 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000583 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000584 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000585 logging.info('All files are uploaded')
586
587 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000588 total = len(items)
589 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000590 logging.info(
591 'Total: %6d, %9.1fkb',
592 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000593 total_size / 1024.)
594 cache_hit = set(items) - missing
595 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000596 logging.info(
597 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
598 len(cache_hit),
599 cache_hit_size / 1024.,
600 len(cache_hit) * 100. / total,
601 cache_hit_size * 100. / total_size if total_size else 0)
602 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000603 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000604 logging.info(
605 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
606 len(cache_miss),
607 cache_miss_size / 1024.,
608 len(cache_miss) * 100. / total,
609 cache_miss_size * 100. / total_size if total_size else 0)
610
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000611 return uploaded
612
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800613 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000614 """Starts asynchronous push to the server in a parallel thread.
615
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800616 Can be used only after |item| was checked for presence on a server with
617 'get_missing_items' call. 'get_missing_items' returns |push_state| object
618 that contains storage specific information describing how to upload
619 the item (for example in case of cloud storage, it is signed upload URLs).
620
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000621 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000622 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000623 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624 push_state: push state returned by 'get_missing_items' call for |item|.
625
626 Returns:
627 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000628 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800629 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400630 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700631 threading_utils.PRIORITY_HIGH if item.high_priority
632 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800633
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000634 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400635 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400636 if self._aborted:
637 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700638 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000640 return item
641
642 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700643 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800644 self.net_thread_pool.add_task_with_channel(
645 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000646 return
647
648 # If zipping is enabled, zip in a separate thread.
649 def zip_and_push():
650 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
651 # content right here. It will block until all file is zipped.
652 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400653 if self._aborted:
654 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800655 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000656 data = ''.join(stream)
657 except Exception as exc:
658 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800659 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000660 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000661 self.net_thread_pool.add_task_with_channel(
662 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000663 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000664
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800665 def push(self, item, push_state):
666 """Synchronously pushes a single item to the server.
667
668 If you need to push many items at once, consider using 'upload_items' or
669 'async_push' with instance of TaskChannel.
670
671 Arguments:
672 item: item to upload as instance of Item class.
673 push_state: push state returned by 'get_missing_items' call for |item|.
674
675 Returns:
676 Pushed item (same object as |item|).
677 """
678 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700679 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800680 self.async_push(channel, item, push_state)
681 pushed = channel.pull()
682 assert pushed is item
683 return item
684
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000685 def async_fetch(self, channel, priority, digest, size, sink):
686 """Starts asynchronous fetch from the server in a parallel thread.
687
688 Arguments:
689 channel: TaskChannel that receives back |digest| when download ends.
690 priority: thread pool task priority for the fetch.
691 digest: hex digest of an item to download.
692 size: expected size of the item (after decompression).
693 sink: function that will be called as sink(generator).
694 """
695 def fetch():
696 try:
697 # Prepare reading pipeline.
698 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700699 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400700 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000701 # Run |stream| through verifier that will assert its size.
702 verifier = FetchStreamVerifier(stream, size)
703 # Verified stream goes to |sink|.
704 sink(verifier.run())
705 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800706 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000707 raise
708 return digest
709
710 # Don't bother with zip_thread_pool for decompression. Decompression is
711 # really fast and most probably IO bound anyway.
712 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
713
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000714 def get_missing_items(self, items):
715 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000716
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000717 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000718
719 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000720 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000721
722 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800723 For each missing item it yields a pair (item, push_state), where:
724 * item - Item object that is missing (one of |items|).
725 * push_state - opaque object that contains storage specific information
726 describing how to upload the item (for example in case of cloud
727 storage, it is signed upload URLs). It can later be passed to
728 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000729 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000730 channel = threading_utils.TaskChannel()
731 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800732
733 # Ensure all digests are calculated.
734 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700735 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800736
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400737 def contains(batch):
738 if self._aborted:
739 raise Aborted()
740 return self._storage_api.contains(batch)
741
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000742 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800743 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400744 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400745 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000746 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800747
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000748 # Yield results as they come in.
749 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800750 for missing_item, push_state in channel.pull().iteritems():
751 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000752
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000753
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800754def batch_items_for_check(items):
755 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000756
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800757 Each batch corresponds to a single 'exists?' query to the server via a call
758 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000759
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800760 Arguments:
761 items: a list of Item objects.
762
763 Yields:
764 Batches of items to query for existence in a single operation,
765 each batch is a list of Item objects.
766 """
767 batch_count = 0
768 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
769 next_queries = []
770 for item in sorted(items, key=lambda x: x.size, reverse=True):
771 next_queries.append(item)
772 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000773 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800774 next_queries = []
775 batch_count += 1
776 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
777 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
778 if next_queries:
779 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000780
781
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000782class FetchQueue(object):
783 """Fetches items from Storage and places them into LocalCache.
784
785 It manages multiple concurrent fetch operations. Acts as a bridge between
786 Storage and LocalCache so that Storage and LocalCache don't depend on each
787 other at all.
788 """
789
790 def __init__(self, storage, cache):
791 self.storage = storage
792 self.cache = cache
793 self._channel = threading_utils.TaskChannel()
794 self._pending = set()
795 self._accessed = set()
796 self._fetched = cache.cached_set()
797
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400798 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700799 self,
800 digest,
801 size=UNKNOWN_FILE_SIZE,
802 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000803 """Starts asynchronous fetch of item |digest|."""
804 # Fetching it now?
805 if digest in self._pending:
806 return
807
808 # Mark this file as in use, verify_all_cached will later ensure it is still
809 # in cache.
810 self._accessed.add(digest)
811
812 # Already fetched? Notify cache to update item's LRU position.
813 if digest in self._fetched:
814 # 'touch' returns True if item is in cache and not corrupted.
815 if self.cache.touch(digest, size):
816 return
817 # Item is corrupted, remove it from cache and fetch it again.
818 self._fetched.remove(digest)
819 self.cache.evict(digest)
820
821 # TODO(maruel): It should look at the free disk space, the current cache
822 # size and the size of the new item on every new item:
823 # - Trim the cache as more entries are listed when free disk space is low,
824 # otherwise if the amount of data downloaded during the run > free disk
825 # space, it'll crash.
826 # - Make sure there's enough free disk space to fit all dependencies of
827 # this run! If not, abort early.
828
829 # Start fetching.
830 self._pending.add(digest)
831 self.storage.async_fetch(
832 self._channel, priority, digest, size,
833 functools.partial(self.cache.write, digest))
834
835 def wait(self, digests):
836 """Starts a loop that waits for at least one of |digests| to be retrieved.
837
838 Returns the first digest retrieved.
839 """
840 # Flush any already fetched items.
841 for digest in digests:
842 if digest in self._fetched:
843 return digest
844
845 # Ensure all requested items are being fetched now.
846 assert all(digest in self._pending for digest in digests), (
847 digests, self._pending)
848
849 # Wait for some requested item to finish fetching.
850 while self._pending:
851 digest = self._channel.pull()
852 self._pending.remove(digest)
853 self._fetched.add(digest)
854 if digest in digests:
855 return digest
856
857 # Should never reach this point due to assert above.
858 raise RuntimeError('Impossible state')
859
860 def inject_local_file(self, path, algo):
861 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700862 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000863 data = f.read()
864 digest = algo(data).hexdigest()
865 self.cache.write(digest, [data])
866 self._fetched.add(digest)
867 return digest
868
869 @property
870 def pending_count(self):
871 """Returns number of items to be fetched."""
872 return len(self._pending)
873
874 def verify_all_cached(self):
875 """True if all accessed items are in cache."""
876 return self._accessed.issubset(self.cache.cached_set())
877
878
879class FetchStreamVerifier(object):
880 """Verifies that fetched file is valid before passing it to the LocalCache."""
881
882 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400883 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000884 self.stream = stream
885 self.expected_size = expected_size
886 self.current_size = 0
887
888 def run(self):
889 """Generator that yields same items as |stream|.
890
891 Verifies |stream| is complete before yielding a last chunk to consumer.
892
893 Also wraps IOError produced by consumer into MappingError exceptions since
894 otherwise Storage will retry fetch on unrelated local cache errors.
895 """
896 # Read one chunk ahead, keep it in |stored|.
897 # That way a complete stream can be verified before pushing last chunk
898 # to consumer.
899 stored = None
900 for chunk in self.stream:
901 assert chunk is not None
902 if stored is not None:
903 self._inspect_chunk(stored, is_last=False)
904 try:
905 yield stored
906 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400907 raise isolated_format.MappingError(
908 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000909 stored = chunk
910 if stored is not None:
911 self._inspect_chunk(stored, is_last=True)
912 try:
913 yield stored
914 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400915 raise isolated_format.MappingError(
916 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000917
918 def _inspect_chunk(self, chunk, is_last):
919 """Called for each fetched chunk before passing it to consumer."""
920 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400921 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700922 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000923 (self.expected_size != self.current_size)):
924 raise IOError('Incorrect file size: expected %d, got %d' % (
925 self.expected_size, self.current_size))
926
927
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000928class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800929 """Interface for classes that implement low-level storage operations.
930
931 StorageApi is oblivious of compression and hashing scheme used. This details
932 are handled in higher level Storage class.
933
934 Clients should generally not use StorageApi directly. Storage class is
935 preferred since it implements compression and upload optimizations.
936 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000937
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700938 @property
939 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500940 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700941 raise NotImplementedError()
942
943 @property
944 def namespace(self):
945 """Isolate namespace used by this storage.
946
947 Indirectly defines hashing scheme and compression method used.
948 """
949 raise NotImplementedError()
950
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800951 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000952 """Fetches an object and yields its content.
953
954 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000955 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800956 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000957
958 Yields:
959 Chunks of downloaded item (as str objects).
960 """
961 raise NotImplementedError()
962
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800963 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000964 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000965
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800966 |item| MUST go through 'contains' call to get |push_state| before it can
967 be pushed to the storage.
968
969 To be clear, here is one possible usage:
970 all_items = [... all items to push as Item subclasses ...]
971 for missing_item, push_state in storage_api.contains(all_items).items():
972 storage_api.push(missing_item, push_state)
973
974 When pushing to a namespace with compression, data that should be pushed
975 and data provided by the item is not the same. In that case |content| is
976 not None and it yields chunks of compressed data (using item.content() as
977 a source of original uncompressed data). This is implemented by Storage
978 class.
979
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000980 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000981 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800982 push_state: push state object as returned by 'contains' call.
983 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000984
985 Returns:
986 None.
987 """
988 raise NotImplementedError()
989
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000990 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800991 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000992
993 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800994 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000995
996 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800997 A dict missing Item -> opaque push state object to be passed to 'push'.
998 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000999 """
1000 raise NotImplementedError()
1001
1002
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001003class _IsolateServerPushState(object):
1004 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -05001005
1006 Note this needs to be a global class to support pickling.
1007 """
1008
Cory Massarocc19c8c2015-03-10 13:35:11 -07001009 def __init__(self, preupload_status, size):
1010 self.preupload_status = preupload_status
1011 gs_upload_url = preupload_status.get('gs_upload_url') or None
1012 if gs_upload_url:
1013 self.upload_url = gs_upload_url
1014 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
1015 else:
1016 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
1017 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -05001018 self.uploaded = False
1019 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001020 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -05001021
1022
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001023class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001024 """StorageApi implementation that downloads and uploads to Isolate Server.
1025
1026 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001027 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001028 """
1029
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001030 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001031 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001032 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001033 self._base_url = base_url.rstrip('/')
1034 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -07001035 self._namespace_dict = {
1036 'compression': 'flate' if namespace.endswith(
1037 ('-gzip', '-flate')) else '',
1038 'digest_hash': 'sha-1',
1039 'namespace': namespace,
1040 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001041 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001042 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001043 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001044
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001045 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001046 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001047 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001048
1049 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001050 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001051 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001052 # TODO(maruel): Make this request much earlier asynchronously while the
1053 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001054
1055 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
1056 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001057
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001058 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001059 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001060 self._server_caps = net.url_read_json(
1061 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
1062 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001063 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001064
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001065 @property
1066 def location(self):
1067 return self._base_url
1068
1069 @property
1070 def namespace(self):
1071 return self._namespace
1072
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001073 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001074 assert offset >= 0
1075 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
1076 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001077 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001078 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001079
Cory Massarocc19c8c2015-03-10 13:35:11 -07001080 if not response:
maruele154f9c2015-09-14 11:03:15 -07001081 raise IOError(
1082 'Attempted to fetch from %s; no data exist: %s / %s.' % (
1083 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001084
Cory Massarocc19c8c2015-03-10 13:35:11 -07001085 # for DB uploads
1086 content = response.get('content')
1087 if content is not None:
maruel863ac262016-03-17 11:00:37 -07001088 yield base64.b64decode(content)
1089 return
Cory Massarocc19c8c2015-03-10 13:35:11 -07001090
1091 # for GS entities
1092 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -07001093 if not connection:
1094 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -07001095
1096 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001097 if offset:
1098 content_range = connection.get_header('Content-Range')
1099 if not content_range:
1100 raise IOError('Missing Content-Range header')
1101
1102 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1103 # According to a spec, <size> can be '*' meaning "Total size of the file
1104 # is not known in advance".
1105 try:
1106 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1107 if not match:
1108 raise ValueError()
1109 content_offset = int(match.group(1))
1110 last_byte_index = int(match.group(2))
1111 size = None if match.group(3) == '*' else int(match.group(3))
1112 except ValueError:
1113 raise IOError('Invalid Content-Range header: %s' % content_range)
1114
1115 # Ensure returned offset equals requested one.
1116 if offset != content_offset:
1117 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1118 offset, content_offset, content_range))
1119
1120 # Ensure entire tail of the file is returned.
1121 if size is not None and last_byte_index + 1 != size:
1122 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1123
maruel863ac262016-03-17 11:00:37 -07001124 for data in connection.iter_content(NET_IO_FILE_CHUNK):
1125 yield data
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001126
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001127 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001128 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001129 assert item.digest is not None
1130 assert item.size is not None
1131 assert isinstance(push_state, _IsolateServerPushState)
1132 assert not push_state.finalized
1133
1134 # Default to item.content().
1135 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001136 logging.info('Push state size: %d', push_state.size)
1137 if isinstance(content, (basestring, list)):
1138 # Memory is already used, too late.
1139 with self._lock:
1140 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001141 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001142 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1143 # If |content| is indeed a generator, it can not be re-winded back to the
1144 # beginning of the stream. A retry will find it exhausted. A possible
1145 # solution is to wrap |content| generator with some sort of caching
1146 # restartable generator. It should be done alongside streaming support
1147 # implementation.
1148 #
1149 # In theory, we should keep the generator, so that it is not serialized in
1150 # memory. Sadly net.HttpService.request() requires the body to be
1151 # serialized.
1152 assert isinstance(content, types.GeneratorType), repr(content)
1153 slept = False
1154 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001155 # One byte less than 512mb. This is to cope with incompressible content.
1156 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001157 while True:
1158 with self._lock:
1159 # This is due to 32 bits python when uploading very large files. The
1160 # problem is that it's comparing uncompressed sizes, while we care
1161 # about compressed sizes since it's what is serialized in memory.
1162 # The first check assumes large files are compressible and that by
1163 # throttling one upload at once, we can survive. Otherwise, kaboom.
1164 memory_use = self._memory_use
1165 if ((push_state.size >= max_size and not memory_use) or
1166 (memory_use + push_state.size <= max_size)):
1167 self._memory_use += push_state.size
1168 memory_use = self._memory_use
1169 break
1170 time.sleep(0.1)
1171 slept = True
1172 if slept:
1173 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001174
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001175 try:
1176 # This push operation may be a retry after failed finalization call below,
1177 # no need to reupload contents in that case.
1178 if not push_state.uploaded:
1179 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001180 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001181 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001182 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001183 item.digest, push_state.upload_url))
1184 push_state.uploaded = True
1185 else:
1186 logging.info(
1187 'A file %s already uploaded, retrying finalization only',
1188 item.digest)
1189
1190 # Optionally notify the server that it's done.
1191 if push_state.finalize_url:
1192 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1193 # send it to isolated server. That way isolate server can verify that
1194 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1195 # stored files).
1196 # TODO(maruel): Fix the server to accept properly data={} so
1197 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001198 response = net.url_read_json(
1199 url='%s/%s' % (self._base_url, push_state.finalize_url),
1200 data={
1201 'upload_ticket': push_state.preupload_status['upload_ticket'],
1202 })
1203 if not response or not response['ok']:
1204 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001205 push_state.finalized = True
1206 finally:
1207 with self._lock:
1208 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001209
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001210 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001211 # Ensure all items were initialized with 'prepare' call. Storage does that.
1212 assert all(i.digest is not None and i.size is not None for i in items)
1213
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001214 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001215 body = {
1216 'items': [
1217 {
1218 'digest': item.digest,
1219 'is_isolated': bool(item.high_priority),
1220 'size': item.size,
1221 } for item in items
1222 ],
1223 'namespace': self._namespace_dict,
1224 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001225
Cory Massarocc19c8c2015-03-10 13:35:11 -07001226 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001227
1228 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001229 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001230 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001231 response = net.url_read_json(url=query_url, data=body)
1232 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001233 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001234 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001235 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001236 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001237 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001238
1239 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001240 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001241 for preupload_status in response.get('items', []):
1242 assert 'upload_ticket' in preupload_status, (
1243 preupload_status, '/preupload did not generate an upload ticket')
1244 index = int(preupload_status['index'])
1245 missing_items[items[index]] = _IsolateServerPushState(
1246 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001247 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001248 len(items), len(items) - len(missing_items))
1249 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001250
Cory Massarocc19c8c2015-03-10 13:35:11 -07001251 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001252 """Fetches isolated data from the URL.
1253
1254 Used only for fetching files, not for API calls. Can be overridden in
1255 subclasses.
1256
1257 Args:
1258 url: URL to fetch the data from, can possibly return http redirect.
1259 offset: byte offset inside the file to start fetching from.
1260
1261 Returns:
1262 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1263 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001264 assert isinstance(offset, int)
1265 data = {
1266 'digest': digest.encode('utf-8'),
1267 'namespace': self._namespace_dict,
1268 'offset': offset,
1269 }
maruel0c25f4f2015-12-15 05:41:17 -08001270 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1271 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001272 return net.url_read_json(
1273 url=url,
1274 data=data,
1275 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001276
Cory Massarocc19c8c2015-03-10 13:35:11 -07001277 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001278 """Uploads isolated file to the URL.
1279
1280 Used only for storing files, not for API calls. Can be overridden in
1281 subclasses.
1282
1283 Args:
1284 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001285 push_state: an _IsolateServicePushState instance
1286 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001287 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001288 """
1289 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1290 # upload support is implemented.
1291 if isinstance(content, list) and len(content) == 1:
1292 content = content[0]
1293 else:
1294 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001295
1296 # DB upload
1297 if not push_state.finalize_url:
1298 url = '%s/%s' % (self._base_url, push_state.upload_url)
1299 content = base64.b64encode(content)
1300 data = {
1301 'upload_ticket': push_state.preupload_status['upload_ticket'],
1302 'content': content,
1303 }
1304 response = net.url_read_json(url=url, data=data)
1305 return response is not None and response['ok']
1306
1307 # upload to GS
1308 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001309 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001310 content_type='application/octet-stream',
1311 data=content,
1312 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001313 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001314 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001315 return response is not None
1316
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001317
nodir445097b2016-06-03 22:50:26 -07001318class CacheMiss(Exception):
1319 """Raised when an item is not in cache."""
1320
1321 def __init__(self, digest):
1322 self.digest = digest
1323 super(CacheMiss, self).__init__(
1324 'Item with digest %r is not found in cache' % digest)
1325
1326
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001327class LocalCache(object):
1328 """Local cache that stores objects fetched via Storage.
1329
1330 It can be accessed concurrently from multiple threads, so it should protect
1331 its internal state with some lock.
1332 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001333 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001334
maruel064c0a32016-04-05 11:47:15 -07001335 def __init__(self):
1336 self._lock = threading_utils.LockWithAssert()
1337 # Profiling values.
1338 self._added = []
1339 self._initial_number_items = 0
1340 self._initial_size = 0
1341 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -07001342 self._used = []
maruel064c0a32016-04-05 11:47:15 -07001343
nodirbe642ff2016-06-09 15:51:51 -07001344 def __contains__(self, digest):
1345 raise NotImplementedError()
1346
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001347 def __enter__(self):
1348 """Context manager interface."""
1349 return self
1350
1351 def __exit__(self, _exc_type, _exec_value, _traceback):
1352 """Context manager interface."""
1353 return False
1354
maruel064c0a32016-04-05 11:47:15 -07001355 @property
1356 def added(self):
1357 return self._added[:]
1358
1359 @property
1360 def evicted(self):
1361 return self._evicted[:]
1362
1363 @property
tansell9e04a8d2016-07-28 09:31:59 -07001364 def used(self):
1365 return self._used[:]
1366
1367 @property
maruel064c0a32016-04-05 11:47:15 -07001368 def initial_number_items(self):
1369 return self._initial_number_items
1370
1371 @property
1372 def initial_size(self):
1373 return self._initial_size
1374
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001375 def cached_set(self):
1376 """Returns a set of all cached digests (always a new object)."""
1377 raise NotImplementedError()
1378
maruel36a963d2016-04-08 17:15:49 -07001379 def cleanup(self):
1380 """Deletes any corrupted item from the cache and trims it if necessary."""
1381 raise NotImplementedError()
1382
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001383 def touch(self, digest, size):
1384 """Ensures item is not corrupted and updates its LRU position.
1385
1386 Arguments:
1387 digest: hash digest of item to check.
1388 size: expected size of this item.
1389
1390 Returns:
1391 True if item is in cache and not corrupted.
1392 """
1393 raise NotImplementedError()
1394
1395 def evict(self, digest):
1396 """Removes item from cache if it's there."""
1397 raise NotImplementedError()
1398
tansell9e04a8d2016-07-28 09:31:59 -07001399 def getfileobj(self, digest):
1400 """Returns a readable file like object.
1401
1402 If file exists on the file system it will have a .name attribute with an
1403 absolute path to the file.
1404 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001405 raise NotImplementedError()
1406
1407 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001408 """Reads data from |content| generator and stores it in cache.
1409
1410 Returns digest to simplify chaining.
1411 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001412 raise NotImplementedError()
1413
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001414
1415class MemoryCache(LocalCache):
1416 """LocalCache implementation that stores everything in memory."""
1417
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001418 def __init__(self, file_mode_mask=0500):
1419 """Args:
1420 file_mode_mask: bit mask to AND file mode with. Default value will make
1421 all mapped files to be read only.
1422 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001423 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001424 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001425 self._contents = {}
1426
nodirbe642ff2016-06-09 15:51:51 -07001427 def __contains__(self, digest):
1428 with self._lock:
1429 return digest in self._contents
1430
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001431 def cached_set(self):
1432 with self._lock:
1433 return set(self._contents)
1434
maruel36a963d2016-04-08 17:15:49 -07001435 def cleanup(self):
1436 pass
1437
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001438 def touch(self, digest, size):
1439 with self._lock:
1440 return digest in self._contents
1441
1442 def evict(self, digest):
1443 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001444 v = self._contents.pop(digest, None)
1445 if v is not None:
1446 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001447
tansell9e04a8d2016-07-28 09:31:59 -07001448 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001449 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001450 try:
tansell9e04a8d2016-07-28 09:31:59 -07001451 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001452 except KeyError:
1453 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001454 self._used.append(len(d))
1455 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001456
1457 def write(self, digest, content):
1458 # Assemble whole stream before taking the lock.
1459 data = ''.join(content)
1460 with self._lock:
1461 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001462 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001463 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001464
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001465
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001466class CachePolicies(object):
1467 def __init__(self, max_cache_size, min_free_space, max_items):
1468 """
1469 Arguments:
1470 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1471 cache is effectively a leak.
1472 - min_free_space: Trim if disk free space becomes lower than this value. If
1473 0, it unconditionally fill the disk.
1474 - max_items: Maximum number of items to keep in the cache. If 0, do not
1475 enforce a limit.
1476 """
1477 self.max_cache_size = max_cache_size
1478 self.min_free_space = min_free_space
1479 self.max_items = max_items
1480
1481
1482class DiskCache(LocalCache):
1483 """Stateful LRU cache in a flat hash table in a directory.
1484
1485 Saves its state as json file.
1486 """
maruel12e30012015-10-09 11:55:35 -07001487 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001488
1489 def __init__(self, cache_dir, policies, hash_algo):
1490 """
1491 Arguments:
1492 cache_dir: directory where to place the cache.
1493 policies: cache retention policies.
1494 algo: hashing algorithm used.
1495 """
maruel064c0a32016-04-05 11:47:15 -07001496 # All protected methods (starting with '_') except _path should be called
1497 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001498 super(DiskCache, self).__init__()
1499 self.cache_dir = cache_dir
1500 self.policies = policies
1501 self.hash_algo = hash_algo
1502 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001503 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001504 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001505 # Current cached free disk space. It is updated by self._trim().
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001506 self._free_disk = 0
maruel2e8d0f52016-07-16 07:51:29 -07001507 # The first item in the LRU cache that must not be evicted during this run
1508 # since it was referenced. All items more recent that _protected in the LRU
1509 # cache are also inherently protected. It could be a set() of all items
1510 # referenced but this increases memory usage without a use case.
1511 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001512 # Cleanup operations done by self._load(), if any.
1513 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001514 with tools.Profiler('Setup'):
1515 with self._lock:
maruel083fa552016-04-08 14:38:01 -07001516 # self._load() calls self._trim() which initializes self._free_disk.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001517 self._load()
1518
nodirbe642ff2016-06-09 15:51:51 -07001519 def __contains__(self, digest):
1520 with self._lock:
1521 return digest in self._lru
1522
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001523 def __enter__(self):
1524 return self
1525
1526 def __exit__(self, _exc_type, _exec_value, _traceback):
1527 with tools.Profiler('CleanupTrimming'):
1528 with self._lock:
1529 self._trim()
1530
1531 logging.info(
1532 '%5d (%8dkb) added',
1533 len(self._added), sum(self._added) / 1024)
1534 logging.info(
1535 '%5d (%8dkb) current',
1536 len(self._lru),
1537 sum(self._lru.itervalues()) / 1024)
1538 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001539 '%5d (%8dkb) evicted',
1540 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001541 logging.info(
1542 ' %8dkb free',
1543 self._free_disk / 1024)
1544 return False
1545
1546 def cached_set(self):
1547 with self._lock:
1548 return self._lru.keys_set()
1549
maruel36a963d2016-04-08 17:15:49 -07001550 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001551 """Cleans up the cache directory.
1552
1553 Ensures there is no unknown files in cache_dir.
1554 Ensures the read-only bits are set correctly.
1555
1556 At that point, the cache was already loaded, trimmed to respect cache
1557 policies.
1558 """
1559 fs.chmod(self.cache_dir, 0700)
1560 # Ensure that all files listed in the state still exist and add new ones.
1561 previous = self._lru.keys_set()
1562 # It'd be faster if there were a readdir() function.
1563 for filename in fs.listdir(self.cache_dir):
1564 if filename == self.STATE_FILE:
1565 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1566 continue
1567 if filename in previous:
1568 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1569 previous.remove(filename)
1570 continue
1571
1572 # An untracked file. Delete it.
1573 logging.warning('Removing unknown file %s from cache', filename)
1574 p = self._path(filename)
1575 if fs.isdir(p):
1576 try:
1577 file_path.rmtree(p)
1578 except OSError:
1579 pass
1580 else:
1581 file_path.try_remove(p)
1582 continue
1583
1584 if previous:
1585 # Filter out entries that were not found.
1586 logging.warning('Removed %d lost files', len(previous))
1587 for filename in previous:
1588 self._lru.pop(filename)
maruel36a963d2016-04-08 17:15:49 -07001589
1590 # What remains to be done is to hash every single item to
1591 # detect corruption, then save to ensure state.json is up to date.
1592 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1593 # TODO(maruel): Let's revisit once directory metadata is stored in
1594 # state.json so only the files that had been mapped since the last cleanup()
1595 # call are manually verified.
1596 #
1597 #with self._lock:
1598 # for digest in self._lru:
1599 # if not isolated_format.is_valid_hash(
1600 # self._path(digest), self.hash_algo):
1601 # self.evict(digest)
1602 # logging.info('Deleted corrupted item: %s', digest)
1603
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001604 def touch(self, digest, size):
1605 """Verifies an actual file is valid.
1606
1607 Note that is doesn't compute the hash so it could still be corrupted if the
1608 file size didn't change.
1609
1610 TODO(maruel): More stringent verification while keeping the check fast.
1611 """
1612 # Do the check outside the lock.
1613 if not is_valid_file(self._path(digest), size):
1614 return False
1615
1616 # Update it's LRU position.
1617 with self._lock:
1618 if digest not in self._lru:
1619 return False
1620 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001621 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001622 return True
1623
1624 def evict(self, digest):
1625 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001626 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001627 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001628 self._lru.pop(digest)
1629 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1630
tansell9e04a8d2016-07-28 09:31:59 -07001631 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001632 try:
tansell9e04a8d2016-07-28 09:31:59 -07001633 f = fs.open(self._path(digest), 'rb')
1634 with self._lock:
1635 self._used.append(self._lru[digest])
1636 return f
nodir445097b2016-06-03 22:50:26 -07001637 except IOError:
1638 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001639
1640 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001641 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001642 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001643 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001644 path = self._path(digest)
1645 # A stale broken file may remain. It is possible for the file to have write
1646 # access bit removed which would cause the file_write() call to fail to open
1647 # in write mode. Take no chance here.
1648 file_path.try_remove(path)
1649 try:
1650 size = file_write(path, content)
1651 except:
1652 # There are two possible places were an exception can occur:
1653 # 1) Inside |content| generator in case of network or unzipping errors.
1654 # 2) Inside file_write itself in case of disk IO errors.
1655 # In any case delete an incomplete file and propagate the exception to
1656 # caller, it will be logged there.
1657 file_path.try_remove(path)
1658 raise
1659 # Make the file read-only in the cache. This has a few side-effects since
1660 # the file node is modified, so every directory entries to this file becomes
1661 # read-only. It's fine here because it is a new file.
1662 file_path.set_read_only(path, True)
1663 with self._lock:
1664 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001665 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001666
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001667 def _load(self):
maruel2e8d0f52016-07-16 07:51:29 -07001668 """Loads state of the cache from json file.
1669
1670 If cache_dir does not exist on disk, it is created.
1671 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001672 self._lock.assert_locked()
1673
maruel2e8d0f52016-07-16 07:51:29 -07001674 if not fs.isfile(self.state_file):
1675 if not os.path.isdir(self.cache_dir):
1676 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001677 else:
maruel2e8d0f52016-07-16 07:51:29 -07001678 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001679 try:
1680 self._lru = lru.LRUDict.load(self.state_file)
1681 except ValueError as err:
1682 logging.error('Failed to load cache state: %s' % (err,))
1683 # Don't want to keep broken state file.
1684 file_path.try_remove(self.state_file)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001685 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001686 # We want the initial cache size after trimming, i.e. what is readily
1687 # avaiable.
1688 self._initial_number_items = len(self._lru)
1689 self._initial_size = sum(self._lru.itervalues())
1690 if self._evicted:
1691 logging.info(
1692 'Trimming evicted items with the following sizes: %s',
1693 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001694
1695 def _save(self):
1696 """Saves the LRU ordering."""
1697 self._lock.assert_locked()
1698 if sys.platform != 'win32':
1699 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001700 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001701 # Necessary otherwise the file can't be created.
1702 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001703 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001704 file_path.set_read_only(self.state_file, False)
1705 self._lru.save(self.state_file)
1706
1707 def _trim(self):
1708 """Trims anything we don't know, make sure enough free space exists."""
1709 self._lock.assert_locked()
1710
1711 # Ensure maximum cache size.
1712 if self.policies.max_cache_size:
1713 total_size = sum(self._lru.itervalues())
1714 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001715 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001716
1717 # Ensure maximum number of items in the cache.
1718 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1719 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001720 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001721
1722 # Ensure enough free space.
1723 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001724 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001725 while (
1726 self.policies.min_free_space and
1727 self._lru and
1728 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001729 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001730 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001731
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001732 if trimmed_due_to_space:
1733 total_usage = sum(self._lru.itervalues())
1734 usage_percent = 0.
1735 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001736 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1737
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001738 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001739 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1740 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1741 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001742 self._free_disk / 1024.,
1743 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001744 usage_percent,
1745 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001746 self._save()
1747
1748 def _path(self, digest):
1749 """Returns the path to one item."""
1750 return os.path.join(self.cache_dir, digest)
1751
maruel2e8d0f52016-07-16 07:51:29 -07001752 def _remove_lru_file(self, allow_protected):
1753 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001754 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001755 try:
1756 digest, size = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001757 if not allow_protected and digest == self._protected:
1758 raise Error('Not enough space to map the whole isolated tree')
maruel083fa552016-04-08 14:38:01 -07001759 except KeyError:
1760 raise Error('Nothing to remove')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001761 digest, size = self._lru.pop_oldest()
kjlubickea9abf02016-06-01 09:34:33 -07001762 logging.debug("Removing LRU file %s", digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001763 self._delete_file(digest, size)
1764 return size
1765
1766 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1767 """Adds an item into LRU cache marking it as a newest one."""
1768 self._lock.assert_locked()
1769 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001770 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001771 self._added.append(size)
1772 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001773 self._free_disk -= size
1774 # Do a quicker version of self._trim(). It only enforces free disk space,
1775 # not cache size limits. It doesn't actually look at real free disk space,
1776 # only uses its cache values. self._trim() will be called later to enforce
1777 # real trimming but doing this quick version here makes it possible to map
1778 # an isolated that is larger than the current amount of free disk space when
1779 # the cache size is already large.
1780 while (
1781 self.policies.min_free_space and
1782 self._lru and
1783 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001784 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001785
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001786 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1787 """Deletes cache file from the file system."""
1788 self._lock.assert_locked()
1789 try:
1790 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001791 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001792 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001793 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001794 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001795 except OSError as e:
1796 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1797
1798
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001799class IsolatedBundle(object):
1800 """Fetched and parsed .isolated file with all dependencies."""
1801
Vadim Shtayura3148e072014-09-02 18:51:52 -07001802 def __init__(self):
1803 self.command = []
1804 self.files = {}
1805 self.read_only = None
1806 self.relative_cwd = None
1807 # The main .isolated file, a IsolatedFile instance.
1808 self.root = None
1809
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001810 def fetch(self, fetch_queue, root_isolated_hash, algo):
1811 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001812
1813 It enables support for "included" .isolated files. They are processed in
1814 strict order but fetched asynchronously from the cache. This is important so
1815 that a file in an included .isolated file that is overridden by an embedding
1816 .isolated file is not fetched needlessly. The includes are fetched in one
1817 pass and the files are fetched as soon as all the ones on the left-side
1818 of the tree were fetched.
1819
1820 The prioritization is very important here for nested .isolated files.
1821 'includes' have the highest priority and the algorithm is optimized for both
1822 deep and wide trees. A deep one is a long link of .isolated files referenced
1823 one at a time by one item in 'includes'. A wide one has a large number of
1824 'includes' in a single .isolated file. 'left' is defined as an included
1825 .isolated file earlier in the 'includes' list. So the order of the elements
1826 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001827
1828 As a side effect this method starts asynchronous fetch of all data files
1829 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1830 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001831 """
1832 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1833
1834 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1835 pending = {}
1836 # Set of hashes of already retrieved items to refuse recursive includes.
1837 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001838 # Set of IsolatedFile's whose data files have already being fetched.
1839 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001840
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001841 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001842 h = isolated_file.obj_hash
1843 if h in seen:
1844 raise isolated_format.IsolatedError(
1845 'IsolatedFile %s is retrieved recursively' % h)
1846 assert h not in pending
1847 seen.add(h)
1848 pending[h] = isolated_file
1849 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1850
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001851 # Start fetching root *.isolated file (single file, not the whole bundle).
1852 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001853
1854 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001855 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001856 item_hash = fetch_queue.wait(pending)
1857 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001858 with fetch_queue.cache.getfileobj(item_hash) as f:
1859 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001860
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001861 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001862 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001863 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001864
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001865 # Always fetch *.isolated files in traversal order, waiting if necessary
1866 # until next to-be-processed node loads. "Waiting" is done by yielding
1867 # back to the outer loop, that waits until some *.isolated is loaded.
1868 for node in isolated_format.walk_includes(self.root):
1869 if node not in processed:
1870 # Not visited, and not yet loaded -> wait for it to load.
1871 if not node.is_loaded:
1872 break
1873 # Not visited and loaded -> process it and continue the traversal.
1874 self._start_fetching_files(node, fetch_queue)
1875 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001876
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001877 # All *.isolated files should be processed by now and only them.
1878 all_isolateds = set(isolated_format.walk_includes(self.root))
1879 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001880
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001881 # Extract 'command' and other bundle properties.
1882 for node in isolated_format.walk_includes(self.root):
1883 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001884 self.relative_cwd = self.relative_cwd or ''
1885
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001886 def _start_fetching_files(self, isolated, fetch_queue):
1887 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001888
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001889 Modifies self.files.
1890 """
1891 logging.debug('fetch_files(%s)', isolated.obj_hash)
1892 for filepath, properties in isolated.data.get('files', {}).iteritems():
1893 # Root isolated has priority on the files being mapped. In particular,
1894 # overridden files must not be fetched.
1895 if filepath not in self.files:
1896 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001897
1898 # Make sure if the isolated is read only, the mode doesn't have write
1899 # bits.
1900 if 'm' in properties and self.read_only:
1901 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1902
1903 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001904 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001905 logging.debug('fetching %s', filepath)
1906 fetch_queue.add(
1907 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1908
1909 def _update_self(self, node):
1910 """Extracts bundle global parameters from loaded *.isolated file.
1911
1912 Will be called with each loaded *.isolated file in order of traversal of
1913 isolated include graph (see isolated_format.walk_includes).
1914 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001915 # Grabs properties.
1916 if not self.command and node.data.get('command'):
1917 # Ensure paths are correctly separated on windows.
1918 self.command = node.data['command']
1919 if self.command:
1920 self.command[0] = self.command[0].replace('/', os.path.sep)
1921 self.command = tools.fix_python_path(self.command)
1922 if self.read_only is None and node.data.get('read_only') is not None:
1923 self.read_only = node.data['read_only']
1924 if (self.relative_cwd is None and
1925 node.data.get('relative_cwd') is not None):
1926 self.relative_cwd = node.data['relative_cwd']
1927
1928
Vadim Shtayura8623c272014-12-01 11:45:27 -08001929def set_storage_api_class(cls):
1930 """Replaces StorageApi implementation used by default."""
1931 global _storage_api_cls
1932 assert _storage_api_cls is None
1933 assert issubclass(cls, StorageApi)
1934 _storage_api_cls = cls
1935
1936
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001937def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001938 """Returns an object that implements low-level StorageApi interface.
1939
1940 It is used by Storage to work with single isolate |namespace|. It should
1941 rarely be used directly by clients, see 'get_storage' for
1942 a better alternative.
1943
1944 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001945 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001946 namespace: isolate namespace to operate in, also defines hashing and
1947 compression scheme used, i.e. namespace names that end with '-gzip'
1948 store compressed data.
1949
1950 Returns:
1951 Instance of StorageApi subclass.
1952 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001953 cls = _storage_api_cls or IsolateServer
1954 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001955
1956
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001957def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001958 """Returns Storage class that can upload and download from |namespace|.
1959
1960 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001961 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001962 namespace: isolate namespace to operate in, also defines hashing and
1963 compression scheme used, i.e. namespace names that end with '-gzip'
1964 store compressed data.
1965
1966 Returns:
1967 Instance of Storage.
1968 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001969 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001970
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001971
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001972def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001973 """Uploads the given tree to the given url.
1974
1975 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001976 base_url: The url of the isolate server to upload to.
1977 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001978 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001979 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001980 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001981 # Filter out symlinks, since they are not represented by items on isolate
1982 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001983 items = []
1984 seen = set()
1985 skipped = 0
1986 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001987 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001988 if 'l' not in metadata and filepath not in seen:
1989 seen.add(filepath)
1990 item = FileItem(
1991 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001992 digest=metadata['h'],
1993 size=metadata['s'],
1994 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001995 items.append(item)
1996 else:
1997 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001998
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001999 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002000 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002001 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07002002
2003
maruel4409e302016-07-19 14:25:51 -07002004def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002005 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002006
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002007 Arguments:
2008 isolated_hash: hash of the root *.isolated file.
2009 storage: Storage class that communicates with isolate storage.
2010 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002011 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07002012 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002013
2014 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002015 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002016 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002017 logging.debug(
maruel4409e302016-07-19 14:25:51 -07002018 'fetch_isolated(%s, %s, %s, %s, %s)',
2019 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002020 # Hash algorithm to use, defined by namespace |storage| is using.
2021 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002022 with cache:
2023 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002024 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002025
2026 with tools.Profiler('GetIsolateds'):
2027 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002028 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002029 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07002030 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002031 try:
maruel1ceb3872015-10-14 06:10:44 -07002032 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002033 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002034 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002035 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
2036 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002037
2038 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002039 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002040
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002041 with tools.Profiler('GetRest'):
2042 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07002043 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002044 create_directories(outdir, bundle.files)
2045 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002046
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002047 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002048 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07002049 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002050
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002051 # Multimap: digest -> list of pairs (path, props).
2052 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002053 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002054 if 'h' in props:
2055 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002056
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002057 # Now block on the remaining files to be downloaded and mapped.
2058 logging.info('Retrieving remaining files (%d of them)...',
2059 fetch_queue.pending_count)
2060 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07002061 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002062 while remaining:
2063 detector.ping()
2064
2065 # Wait for any item to finish fetching to cache.
2066 digest = fetch_queue.wait(remaining)
2067
tansell9e04a8d2016-07-28 09:31:59 -07002068 # Create the files in the destination using item in cache as the
2069 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002070 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07002071 fullpath = os.path.join(outdir, filepath)
2072
2073 with cache.getfileobj(digest) as srcfileobj:
2074 file_mode = props.get('m')
2075 if file_mode:
2076 # Ignore all bits apart from the user
2077 file_mode &= 0700
2078 putfile(
2079 srcfileobj, fullpath, file_mode,
2080 use_symlink=use_symlinks)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002081
2082 # Report progress.
2083 duration = time.time() - last_update
2084 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
2085 msg = '%d files remaining...' % len(remaining)
2086 print msg
2087 logging.info(msg)
2088 last_update = time.time()
2089
2090 # Cache could evict some items we just tried to fetch, it's a fatal error.
2091 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002092 raise isolated_format.MappingError(
2093 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002094 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002095
2096
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002097def directory_to_metadata(root, algo, blacklist):
2098 """Returns the FileItem list and .isolated metadata for a directory."""
2099 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002100 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07002101 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002102 metadata = {
2103 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05002104 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002105 for relpath in paths
2106 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002107 for v in metadata.itervalues():
2108 v.pop('t')
2109 items = [
2110 FileItem(
2111 path=os.path.join(root, relpath),
2112 digest=meta['h'],
2113 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002114 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002115 for relpath, meta in metadata.iteritems() if 'h' in meta
2116 ]
2117 return items, metadata
2118
2119
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002120def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002121 """Stores every entries and returns the relevant data.
2122
2123 Arguments:
2124 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002125 files: list of file paths to upload. If a directory is specified, a
2126 .isolated file is created and its hash is returned.
2127 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07002128
2129 Returns:
2130 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
2131 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002132 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002133 assert all(isinstance(i, unicode) for i in files), files
2134 if len(files) != len(set(map(os.path.abspath, files))):
2135 raise Error('Duplicate entries found.')
2136
maruel064c0a32016-04-05 11:47:15 -07002137 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002138 results = []
2139 # The temporary directory is only created as needed.
2140 tempdir = None
2141 try:
2142 # TODO(maruel): Yield the files to a worker thread.
2143 items_to_upload = []
2144 for f in files:
2145 try:
2146 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07002147 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002148 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002149 items, metadata = directory_to_metadata(
2150 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002151
2152 # Create the .isolated file.
2153 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002154 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
2155 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002156 os.close(handle)
2157 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002158 'algo':
2159 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002160 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002161 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002162 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002163 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002164 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002165 items_to_upload.extend(items)
2166 items_to_upload.append(
2167 FileItem(
2168 path=isolated,
2169 digest=h,
maruel12e30012015-10-09 11:55:35 -07002170 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002171 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002172 results.append((h, f))
2173
maruel12e30012015-10-09 11:55:35 -07002174 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002175 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002176 items_to_upload.append(
2177 FileItem(
2178 path=filepath,
2179 digest=h,
maruel12e30012015-10-09 11:55:35 -07002180 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002181 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002182 results.append((h, f))
2183 else:
2184 raise Error('%s is neither a file or directory.' % f)
2185 except OSError:
2186 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07002187 uploaded = storage.upload_items(items_to_upload)
2188 cold = [i for i in items_to_upload if i in uploaded]
2189 hot = [i for i in items_to_upload if i not in uploaded]
2190 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002191 finally:
maruel12e30012015-10-09 11:55:35 -07002192 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002193 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002194
2195
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002196def archive(out, namespace, files, blacklist):
2197 if files == ['-']:
2198 files = sys.stdin.readlines()
2199
2200 if not files:
2201 raise Error('Nothing to upload')
2202
2203 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002204 blacklist = tools.gen_blacklist(blacklist)
2205 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002206 # Ignore stats.
2207 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002208 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2209
2210
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002211@subcommand.usage('<file1..fileN> or - to read from stdin')
2212def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002213 """Archives data to the server.
2214
2215 If a directory is specified, a .isolated file is created the whole directory
2216 is uploaded. Then this .isolated file can be included in another one to run
2217 commands.
2218
2219 The commands output each file that was processed with its content hash. For
2220 directories, the .isolated generated for the directory is listed as the
2221 directory entry itself.
2222 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002223 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002224 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002225 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07002226 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002227 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002228 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002229 except Error as e:
2230 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002231 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002232
2233
2234def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002235 """Download data from the server.
2236
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002237 It can either download individual files or a complete tree from a .isolated
2238 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002239 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002240 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002241 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002242 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002243 help='hash of an isolated file, .isolated file content is discarded, use '
2244 '--file if you need it')
2245 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002246 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2247 help='hash and destination of a file, can be used multiple times')
2248 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002249 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002250 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07002251 parser.add_option(
2252 '--use-symlinks', action='store_true',
2253 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002254 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002255 options, args = parser.parse_args(args)
2256 if args:
2257 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002258
nodir55be77b2016-05-03 09:39:57 -07002259 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002260 if bool(options.isolated) == bool(options.file):
2261 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07002262 if not options.cache and options.use_symlinks:
2263 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002264
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002265 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07002266 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07002267 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002268 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002269 if (fs.isfile(options.target) or
2270 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002271 parser.error(
2272 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002273 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002274 # Fetching individual files.
2275 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002276 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002277 channel = threading_utils.TaskChannel()
2278 pending = {}
2279 for digest, dest in options.file:
2280 pending[digest] = dest
2281 storage.async_fetch(
2282 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002283 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002284 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002285 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002286 functools.partial(file_write, os.path.join(options.target, dest)))
2287 while pending:
2288 fetched = channel.pull()
2289 dest = pending.pop(fetched)
2290 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002291
Vadim Shtayura3172be52013-12-03 12:49:05 -08002292 # Fetching whole isolated tree.
2293 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002294 with cache:
2295 bundle = fetch_isolated(
2296 isolated_hash=options.isolated,
2297 storage=storage,
2298 cache=cache,
maruel4409e302016-07-19 14:25:51 -07002299 outdir=options.target,
2300 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002301 if bundle.command:
2302 rel = os.path.join(options.target, bundle.relative_cwd)
2303 print('To run this test please run from the directory %s:' %
2304 os.path.join(options.target, rel))
2305 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002306
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002307 return 0
2308
2309
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002310def add_archive_options(parser):
2311 parser.add_option(
2312 '--blacklist',
2313 action='append', default=list(DEFAULT_BLACKLIST),
2314 help='List of regexp to use as blacklist filter when uploading '
2315 'directories')
2316
2317
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002318def add_isolate_server_options(parser):
2319 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002320 parser.add_option(
2321 '-I', '--isolate-server',
2322 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002323 help='URL of the Isolate Server to use. Defaults to the environment '
2324 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2325 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002326 parser.add_option(
2327 '--namespace', default='default-gzip',
2328 help='The namespace to use on the Isolate Server, default: %default')
2329
2330
nodir55be77b2016-05-03 09:39:57 -07002331def process_isolate_server_options(
2332 parser, options, set_exception_handler, required):
2333 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002334
2335 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002336 """
2337 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002338 if required:
2339 parser.error('--isolate-server is required.')
2340 return
2341
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002342 try:
2343 options.isolate_server = net.fix_url(options.isolate_server)
2344 except ValueError as e:
2345 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002346 if set_exception_handler:
2347 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002348 try:
2349 return auth.ensure_logged_in(options.isolate_server)
2350 except ValueError as e:
2351 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002352
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002353
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002354def add_cache_options(parser):
2355 cache_group = optparse.OptionGroup(parser, 'Cache management')
2356 cache_group.add_option(
2357 '--cache', metavar='DIR',
2358 help='Directory to keep a local cache of the files. Accelerates download '
2359 'by reusing already downloaded files. Default=%default')
2360 cache_group.add_option(
2361 '--max-cache-size',
2362 type='int',
2363 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002364 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002365 help='Trim if the cache gets larger than this value, default=%default')
2366 cache_group.add_option(
2367 '--min-free-space',
2368 type='int',
2369 metavar='NNN',
2370 default=2*1024*1024*1024,
2371 help='Trim if disk free space becomes lower than this value, '
2372 'default=%default')
2373 cache_group.add_option(
2374 '--max-items',
2375 type='int',
2376 metavar='NNN',
2377 default=100000,
2378 help='Trim if more than this number of items are in the cache '
2379 'default=%default')
2380 parser.add_option_group(cache_group)
2381
2382
2383def process_cache_options(options):
2384 if options.cache:
2385 policies = CachePolicies(
2386 options.max_cache_size, options.min_free_space, options.max_items)
2387
2388 # |options.cache| path may not exist until DiskCache() instance is created.
2389 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002390 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002391 policies,
2392 isolated_format.get_hash_algo(options.namespace))
2393 else:
2394 return MemoryCache()
2395
2396
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002397class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002398 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002399 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002400 self,
2401 version=__version__,
2402 prog=os.path.basename(sys.modules[__name__].__file__),
2403 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002404 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002405
2406 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002407 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002408 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002409 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002410 return options, args
2411
2412
2413def main(args):
2414 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002415 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002416
2417
2418if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002419 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002420 fix_encoding.fix_encoding()
2421 tools.disable_buffering()
2422 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002423 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002424 sys.exit(main(sys.argv[1:]))