blob: 22ab89cdefabf1af5b24024fa862765f16ef9d3d [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
tansell9e04a8d2016-07-28 09:31:59 -07008__version__ = '0.6.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
13import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040015import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000022import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050024import types
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000025import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000026
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027from third_party import colorama
28from third_party.depot_tools import fix_encoding
29from third_party.depot_tools import subcommand
30
tanselle4288c32016-07-28 09:45:40 -070031from libs import arfile
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050032from utils import file_path
maruel12e30012015-10-09 11:55:35 -070033from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040034from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040035from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000036from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040037from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070038from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000039from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000040from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080042import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040043import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# The file size to be used when we don't know the correct file size,
51# generally used for .isolated files.
52UNKNOWN_FILE_SIZE = None
53
54
55# Maximum expected delay (in seconds) between successive file fetches or uploads
56# in Storage. If it takes longer than that, a deadlock might be happening
57# and all stack frames for all threads are dumped to log.
58DEADLOCK_TIMEOUT = 5 * 60
59
60
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000061# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000062# All files are sorted by likelihood of a change in the file content
63# (currently file size is used to estimate this: larger the file -> larger the
64# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000065# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000066# and so on. Numbers here is a trade-off; the more per request, the lower the
67# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
68# larger values cause longer lookups, increasing the initial latency to start
69# uploading, which is especially an issue for large files. This value is
70# optimized for the "few thousands files to look up with minimal number of large
71# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040072ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000073
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000074
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075# A list of already compressed extension types that should not receive any
76# compression before being uploaded.
77ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040078 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
79 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000080]
81
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000082
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000083# Chunk size to use when reading from network stream.
84NET_IO_FILE_CHUNK = 16 * 1024
85
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000086
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000087# Read timeout in seconds for downloads from isolate storage. If there's no
88# response from the server within this timeout whole download will be aborted.
89DOWNLOAD_READ_TIMEOUT = 60
90
91
maruel@chromium.org41601642013-09-18 19:40:46 +000092# The delay (in seconds) to wait between logging statements when retrieving
93# the required files. This is intended to let the user (or buildbot) know that
94# the program is still running.
95DELAY_BETWEEN_UPDATES_IN_SECS = 30
96
97
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050098DEFAULT_BLACKLIST = (
99 # Temporary vim or python files.
100 r'^.+\.(?:pyc|swp)$',
101 # .git or .svn directory.
102 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
103)
104
105
Vadim Shtayura8623c272014-12-01 11:45:27 -0800106# A class to use to communicate with the server by default. Can be changed by
107# 'set_storage_api_class'. Default is IsolateServer.
108_storage_api_cls = None
109
110
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500111class Error(Exception):
112 """Generic runtime error."""
113 pass
114
115
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400116class Aborted(Error):
117 """Operation aborted."""
118 pass
119
120
nodir90bc8dc2016-06-15 13:35:21 -0700121class AlreadyExists(Error):
122 """File already exists."""
123
124
maruel12e30012015-10-09 11:55:35 -0700125def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800126 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700127 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800128 if offset:
129 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000130 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000131 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 if not data:
133 break
134 yield data
135
136
maruel12e30012015-10-09 11:55:35 -0700137def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138 """Writes file content as generated by content_generator.
139
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000140 Creates the intermediary directory as needed.
141
142 Returns the number of bytes written.
143
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000144 Meant to be mocked out in unit tests.
145 """
nodire5028a92016-04-29 14:38:21 -0700146 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 total = 0
maruel12e30012015-10-09 11:55:35 -0700148 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000149 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000150 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000152 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153
154
tansell9e04a8d2016-07-28 09:31:59 -0700155def fileobj_path(fileobj):
156 """Return file system path for file like object or None.
157
158 The returned path is guaranteed to exist and can be passed to file system
159 operations like copy.
160 """
161 name = getattr(fileobj, 'name', None)
162 if name is None:
163 return
164
165 # If the file like object was created using something like open("test.txt")
166 # name will end up being a str (such as a function outside our control, like
167 # the standard library). We want all our paths to be unicode objects, so we
168 # decode it.
169 if not isinstance(name, unicode):
170 name = name.decode(sys.getfilesystemencoding())
171
172 if fs.exists(name):
173 return name
174
175
176# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
177# wrappers have been created.
178def fileobj_copy(
179 dstfileobj, srcfileobj, size=-1,
180 chunk_size=isolated_format.DISK_FILE_CHUNK):
181 """Copy data from srcfileobj to dstfileobj.
182
183 Providing size means exactly that amount of data will be copied (if there
184 isn't enough data, an IOError exception is thrown). Otherwise all data until
185 the EOF marker will be copied.
186 """
187 if size == -1 and hasattr(srcfileobj, 'tell'):
188 if srcfileobj.tell() != 0:
189 raise IOError('partial file but not using size')
190
191 written = 0
192 while written != size:
193 readsize = chunk_size
194 if size > 0:
195 readsize = min(readsize, size-written)
196 data = srcfileobj.read(readsize)
197 if not data:
198 if size == -1:
199 break
200 raise IOError('partial file, got %s, wanted %s' % (written, size))
201 dstfileobj.write(data)
202 written += len(data)
203
204
205def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
206 """Put srcfileobj at the given dstpath with given mode.
207
208 The function aims to do this as efficiently as possible while still allowing
209 any possible file like object be given.
210
211 Creating a tree of hardlinks has a few drawbacks:
212 - tmpfs cannot be used for the scratch space. The tree has to be on the same
213 partition as the cache.
214 - involves a write to the inode, which advances ctime, cause a metadata
215 writeback (causing disk seeking).
216 - cache ctime cannot be used to detect modifications / corruption.
217 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
218 partition. This is why the function automatically fallbacks to copying the
219 file content.
220 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
221 same owner is for all hardlinks.
222 - Anecdotal report that ext2 is known to be potentially faulty on high rate
223 of hardlink creation.
224
225 Creating a tree of symlinks has a few drawbacks:
226 - Tasks running the equivalent of os.path.realpath() will get the naked path
227 and may fail.
228 - Windows:
229 - Symlinks are reparse points:
230 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
231 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
232 - Symbolic links are Win32 paths, not NT paths.
233 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
234 - Symbolic links are supported on Windows 7 and later only.
235 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
236 default.
237 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
238 RID is present in the token;
239 https://msdn.microsoft.com/en-us/library/bb530410.aspx
240 """
241 srcpath = fileobj_path(srcfileobj)
242 if srcpath and size == -1:
243 readonly = file_mode is None or (
244 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
245
246 if readonly:
247 # If the file is read only we can link the file
248 if use_symlink:
249 link_mode = file_path.SYMLINK_WITH_FALLBACK
250 else:
251 link_mode = file_path.HARDLINK_WITH_FALLBACK
252 else:
253 # If not read only, we must copy the file
254 link_mode = file_path.COPY
255
256 file_path.link_file(dstpath, srcpath, link_mode)
257 else:
258 # Need to write out the file
259 with fs.open(dstpath, 'wb') as dstfileobj:
260 fileobj_copy(dstfileobj, srcfileobj, size)
261
262 assert fs.exists(dstpath)
263
264 # file_mode of 0 is actually valid, so need explicit check.
265 if file_mode is not None:
266 fs.chmod(dstpath, file_mode)
267
268
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000269def zip_compress(content_generator, level=7):
270 """Reads chunks from |content_generator| and yields zip compressed chunks."""
271 compressor = zlib.compressobj(level)
272 for chunk in content_generator:
273 compressed = compressor.compress(chunk)
274 if compressed:
275 yield compressed
276 tail = compressor.flush(zlib.Z_FINISH)
277 if tail:
278 yield tail
279
280
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400281def zip_decompress(
282 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000283 """Reads zipped data from |content_generator| and yields decompressed data.
284
285 Decompresses data in small chunks (no larger than |chunk_size|) so that
286 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
287
288 Raises IOError if data is corrupted or incomplete.
289 """
290 decompressor = zlib.decompressobj()
291 compressed_size = 0
292 try:
293 for chunk in content_generator:
294 compressed_size += len(chunk)
295 data = decompressor.decompress(chunk, chunk_size)
296 if data:
297 yield data
298 while decompressor.unconsumed_tail:
299 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
300 if data:
301 yield data
302 tail = decompressor.flush()
303 if tail:
304 yield tail
305 except zlib.error as e:
306 raise IOError(
307 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
308 # Ensure all data was read and decompressed.
309 if decompressor.unused_data or decompressor.unconsumed_tail:
310 raise IOError('Not all data was decompressed')
311
312
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000313def get_zip_compression_level(filename):
314 """Given a filename calculates the ideal zip compression level to use."""
315 file_ext = os.path.splitext(filename)[1].lower()
316 # TODO(csharp): Profile to find what compression level works best.
317 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
318
319
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000320def create_directories(base_directory, files):
321 """Creates the directory structure needed by the given list of files."""
322 logging.debug('create_directories(%s, %d)', base_directory, len(files))
323 # Creates the tree of directories to create.
324 directories = set(os.path.dirname(f) for f in files)
325 for item in list(directories):
326 while item:
327 directories.add(item)
328 item = os.path.dirname(item)
329 for d in sorted(directories):
330 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700331 abs_d = os.path.join(base_directory, d)
332 if not fs.isdir(abs_d):
333 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000334
335
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500336def create_symlinks(base_directory, files):
337 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000338 for filepath, properties in files:
339 if 'l' not in properties:
340 continue
341 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500342 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000343 logging.warning('Ignoring symlink %s', filepath)
344 continue
345 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700346 try:
347 os.symlink(properties['l'], outfile) # pylint: disable=E1101
348 except OSError as e:
349 if e.errno == errno.EEXIST:
350 raise AlreadyExists('File %s already exists.' % outfile)
351 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000352
353
maruel12e30012015-10-09 11:55:35 -0700354def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000355 """Determines if the given files appears valid.
356
357 Currently it just checks the file's size.
358 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700359 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700360 return fs.isfile(path)
361 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000362 if size != actual_size:
363 logging.warning(
364 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700365 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000366 return False
367 return True
368
369
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000370class Item(object):
371 """An item to push to Storage.
372
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800373 Its digest and size may be provided in advance, if known. Otherwise they will
374 be derived from content(). If digest is provided, it MUST correspond to
375 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000376
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800377 When used with Storage, Item starts its life in a main thread, travels
378 to 'contains' thread, then to 'push' thread and then finally back to
379 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000380 """
381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000383 self.digest = digest
384 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800385 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000386 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000387
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800388 def content(self):
389 """Iterable with content of this item as byte string (str) chunks."""
390 raise NotImplementedError()
391
392 def prepare(self, hash_algo):
393 """Ensures self.digest and self.size are set.
394
395 Uses content() as a source of data to calculate them. Does nothing if digest
396 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000397
398 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800399 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000400 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800401 if self.digest is None or self.size is None:
402 digest = hash_algo()
403 total = 0
404 for chunk in self.content():
405 digest.update(chunk)
406 total += len(chunk)
407 self.digest = digest.hexdigest()
408 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000409
410
411class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800412 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000413
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800414 Its digest and size may be provided in advance, if known. Otherwise they will
415 be derived from the file content.
416 """
417
418 def __init__(self, path, digest=None, size=None, high_priority=False):
419 super(FileItem, self).__init__(
420 digest,
maruel12e30012015-10-09 11:55:35 -0700421 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800422 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000423 self.path = path
424 self.compression_level = get_zip_compression_level(path)
425
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800426 def content(self):
427 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000428
429
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000430class BufferItem(Item):
431 """A byte buffer to push to Storage."""
432
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800433 def __init__(self, buf, high_priority=False):
434 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000435 self.buffer = buf
436
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800437 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000438 return [self.buffer]
439
440
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000441class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800442 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000443
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800444 Implements compression support, parallel 'contains' checks, parallel uploads
445 and more.
446
447 Works only within single namespace (and thus hashing algorithm and compression
448 scheme are fixed).
449
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400450 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
451 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800452 """
453
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700454 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000455 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400456 self._use_zip = isolated_format.is_namespace_with_compression(
457 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400458 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 self._cpu_thread_pool = None
460 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400461 self._aborted = False
462 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000463
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700465 def hash_algo(self):
466 """Hashing algorithm used to name files in storage based on their content.
467
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400468 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700469 """
470 return self._hash_algo
471
472 @property
473 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500474 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700475 return self._storage_api.location
476
477 @property
478 def namespace(self):
479 """Isolate namespace used by this storage.
480
481 Indirectly defines hashing scheme and compression method used.
482 """
483 return self._storage_api.namespace
484
485 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000486 def cpu_thread_pool(self):
487 """ThreadPool for CPU-bound tasks like zipping."""
488 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500489 threads = max(threading_utils.num_processors(), 2)
490 if sys.maxsize <= 2L**32:
491 # On 32 bits userland, do not try to use more than 16 threads.
492 threads = min(threads, 16)
493 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000494 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000495
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000496 @property
497 def net_thread_pool(self):
498 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
499 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700500 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000501 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000502
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000503 def close(self):
504 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400505 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000506 if self._cpu_thread_pool:
507 self._cpu_thread_pool.join()
508 self._cpu_thread_pool.close()
509 self._cpu_thread_pool = None
510 if self._net_thread_pool:
511 self._net_thread_pool.join()
512 self._net_thread_pool.close()
513 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400514 logging.info('Done.')
515
516 def abort(self):
517 """Cancels any pending or future operations."""
518 # This is not strictly theadsafe, but in the worst case the logging message
519 # will be printed twice. Not a big deal. In other places it is assumed that
520 # unprotected reads and writes to _aborted are serializable (it is true
521 # for python) and thus no locking is used.
522 if not self._aborted:
523 logging.warning('Aborting... It can take a while.')
524 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000525
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 def __enter__(self):
527 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400528 assert not self._prev_sig_handlers, self._prev_sig_handlers
529 for s in (signal.SIGINT, signal.SIGTERM):
530 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000531 return self
532
533 def __exit__(self, _exc_type, _exc_value, _traceback):
534 """Context manager interface."""
535 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400536 while self._prev_sig_handlers:
537 s, h = self._prev_sig_handlers.popitem()
538 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 return False
540
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800542 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800544 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000545
546 Arguments:
547 items: list of Item instances that represents data to upload.
548
549 Returns:
550 List of items that were uploaded. All other items are already there.
551 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700552 logging.info('upload_items(items=%d)', len(items))
553
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800554 # Ensure all digests are calculated.
555 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700556 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800557
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000558 # For each digest keep only first Item that matches it. All other items
559 # are just indistinguishable copies from the point of view of isolate
560 # server (it doesn't care about paths at all, only content and digests).
561 seen = {}
562 duplicates = 0
563 for item in items:
564 if seen.setdefault(item.digest, item) is not item:
565 duplicates += 1
566 items = seen.values()
567 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700568 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000569
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000570 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000571 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000572 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800573 channel = threading_utils.TaskChannel()
574 for missing_item, push_state in self.get_missing_items(items):
575 missing.add(missing_item)
576 self.async_push(channel, missing_item, push_state)
577
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000578 # No need to spawn deadlock detector thread if there's nothing to upload.
579 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700580 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000581 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000582 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000583 detector.ping()
584 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000585 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000586 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000587 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000588 logging.info('All files are uploaded')
589
590 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000591 total = len(items)
592 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000593 logging.info(
594 'Total: %6d, %9.1fkb',
595 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000596 total_size / 1024.)
597 cache_hit = set(items) - missing
598 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 logging.info(
600 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
601 len(cache_hit),
602 cache_hit_size / 1024.,
603 len(cache_hit) * 100. / total,
604 cache_hit_size * 100. / total_size if total_size else 0)
605 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000606 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000607 logging.info(
608 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
609 len(cache_miss),
610 cache_miss_size / 1024.,
611 len(cache_miss) * 100. / total,
612 cache_miss_size * 100. / total_size if total_size else 0)
613
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000614 return uploaded
615
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800616 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000617 """Starts asynchronous push to the server in a parallel thread.
618
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800619 Can be used only after |item| was checked for presence on a server with
620 'get_missing_items' call. 'get_missing_items' returns |push_state| object
621 that contains storage specific information describing how to upload
622 the item (for example in case of cloud storage, it is signed upload URLs).
623
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000624 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000625 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000626 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800627 push_state: push state returned by 'get_missing_items' call for |item|.
628
629 Returns:
630 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000631 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800632 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400633 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700634 threading_utils.PRIORITY_HIGH if item.high_priority
635 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800636
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000637 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400638 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400639 if self._aborted:
640 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700641 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000643 return item
644
645 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700646 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800647 self.net_thread_pool.add_task_with_channel(
648 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000649 return
650
651 # If zipping is enabled, zip in a separate thread.
652 def zip_and_push():
653 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
654 # content right here. It will block until all file is zipped.
655 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400656 if self._aborted:
657 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800658 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000659 data = ''.join(stream)
660 except Exception as exc:
661 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800662 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000663 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000664 self.net_thread_pool.add_task_with_channel(
665 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000666 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000667
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800668 def push(self, item, push_state):
669 """Synchronously pushes a single item to the server.
670
671 If you need to push many items at once, consider using 'upload_items' or
672 'async_push' with instance of TaskChannel.
673
674 Arguments:
675 item: item to upload as instance of Item class.
676 push_state: push state returned by 'get_missing_items' call for |item|.
677
678 Returns:
679 Pushed item (same object as |item|).
680 """
681 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700682 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800683 self.async_push(channel, item, push_state)
684 pushed = channel.pull()
685 assert pushed is item
686 return item
687
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000688 def async_fetch(self, channel, priority, digest, size, sink):
689 """Starts asynchronous fetch from the server in a parallel thread.
690
691 Arguments:
692 channel: TaskChannel that receives back |digest| when download ends.
693 priority: thread pool task priority for the fetch.
694 digest: hex digest of an item to download.
695 size: expected size of the item (after decompression).
696 sink: function that will be called as sink(generator).
697 """
698 def fetch():
699 try:
700 # Prepare reading pipeline.
701 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700702 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400703 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000704 # Run |stream| through verifier that will assert its size.
705 verifier = FetchStreamVerifier(stream, size)
706 # Verified stream goes to |sink|.
707 sink(verifier.run())
708 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800709 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000710 raise
711 return digest
712
713 # Don't bother with zip_thread_pool for decompression. Decompression is
714 # really fast and most probably IO bound anyway.
715 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
716
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000717 def get_missing_items(self, items):
718 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000719
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000720 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000721
722 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000723 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000724
725 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800726 For each missing item it yields a pair (item, push_state), where:
727 * item - Item object that is missing (one of |items|).
728 * push_state - opaque object that contains storage specific information
729 describing how to upload the item (for example in case of cloud
730 storage, it is signed upload URLs). It can later be passed to
731 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000732 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000733 channel = threading_utils.TaskChannel()
734 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800735
736 # Ensure all digests are calculated.
737 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700738 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800739
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400740 def contains(batch):
741 if self._aborted:
742 raise Aborted()
743 return self._storage_api.contains(batch)
744
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000745 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800746 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400747 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400748 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000749 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800750
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000751 # Yield results as they come in.
752 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800753 for missing_item, push_state in channel.pull().iteritems():
754 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000755
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000756
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800757def batch_items_for_check(items):
758 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000759
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800760 Each batch corresponds to a single 'exists?' query to the server via a call
761 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000762
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800763 Arguments:
764 items: a list of Item objects.
765
766 Yields:
767 Batches of items to query for existence in a single operation,
768 each batch is a list of Item objects.
769 """
770 batch_count = 0
771 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
772 next_queries = []
773 for item in sorted(items, key=lambda x: x.size, reverse=True):
774 next_queries.append(item)
775 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000776 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800777 next_queries = []
778 batch_count += 1
779 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
780 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
781 if next_queries:
782 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000783
784
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000785class FetchQueue(object):
786 """Fetches items from Storage and places them into LocalCache.
787
788 It manages multiple concurrent fetch operations. Acts as a bridge between
789 Storage and LocalCache so that Storage and LocalCache don't depend on each
790 other at all.
791 """
792
793 def __init__(self, storage, cache):
794 self.storage = storage
795 self.cache = cache
796 self._channel = threading_utils.TaskChannel()
797 self._pending = set()
798 self._accessed = set()
799 self._fetched = cache.cached_set()
800
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400801 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700802 self,
803 digest,
804 size=UNKNOWN_FILE_SIZE,
805 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000806 """Starts asynchronous fetch of item |digest|."""
807 # Fetching it now?
808 if digest in self._pending:
809 return
810
811 # Mark this file as in use, verify_all_cached will later ensure it is still
812 # in cache.
813 self._accessed.add(digest)
814
815 # Already fetched? Notify cache to update item's LRU position.
816 if digest in self._fetched:
817 # 'touch' returns True if item is in cache and not corrupted.
818 if self.cache.touch(digest, size):
819 return
820 # Item is corrupted, remove it from cache and fetch it again.
821 self._fetched.remove(digest)
822 self.cache.evict(digest)
823
824 # TODO(maruel): It should look at the free disk space, the current cache
825 # size and the size of the new item on every new item:
826 # - Trim the cache as more entries are listed when free disk space is low,
827 # otherwise if the amount of data downloaded during the run > free disk
828 # space, it'll crash.
829 # - Make sure there's enough free disk space to fit all dependencies of
830 # this run! If not, abort early.
831
832 # Start fetching.
833 self._pending.add(digest)
834 self.storage.async_fetch(
835 self._channel, priority, digest, size,
836 functools.partial(self.cache.write, digest))
837
838 def wait(self, digests):
839 """Starts a loop that waits for at least one of |digests| to be retrieved.
840
841 Returns the first digest retrieved.
842 """
843 # Flush any already fetched items.
844 for digest in digests:
845 if digest in self._fetched:
846 return digest
847
848 # Ensure all requested items are being fetched now.
849 assert all(digest in self._pending for digest in digests), (
850 digests, self._pending)
851
852 # Wait for some requested item to finish fetching.
853 while self._pending:
854 digest = self._channel.pull()
855 self._pending.remove(digest)
856 self._fetched.add(digest)
857 if digest in digests:
858 return digest
859
860 # Should never reach this point due to assert above.
861 raise RuntimeError('Impossible state')
862
863 def inject_local_file(self, path, algo):
864 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700865 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000866 data = f.read()
867 digest = algo(data).hexdigest()
868 self.cache.write(digest, [data])
869 self._fetched.add(digest)
870 return digest
871
872 @property
873 def pending_count(self):
874 """Returns number of items to be fetched."""
875 return len(self._pending)
876
877 def verify_all_cached(self):
878 """True if all accessed items are in cache."""
879 return self._accessed.issubset(self.cache.cached_set())
880
881
882class FetchStreamVerifier(object):
883 """Verifies that fetched file is valid before passing it to the LocalCache."""
884
885 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400886 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000887 self.stream = stream
888 self.expected_size = expected_size
889 self.current_size = 0
890
891 def run(self):
892 """Generator that yields same items as |stream|.
893
894 Verifies |stream| is complete before yielding a last chunk to consumer.
895
896 Also wraps IOError produced by consumer into MappingError exceptions since
897 otherwise Storage will retry fetch on unrelated local cache errors.
898 """
899 # Read one chunk ahead, keep it in |stored|.
900 # That way a complete stream can be verified before pushing last chunk
901 # to consumer.
902 stored = None
903 for chunk in self.stream:
904 assert chunk is not None
905 if stored is not None:
906 self._inspect_chunk(stored, is_last=False)
907 try:
908 yield stored
909 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400910 raise isolated_format.MappingError(
911 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000912 stored = chunk
913 if stored is not None:
914 self._inspect_chunk(stored, is_last=True)
915 try:
916 yield stored
917 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400918 raise isolated_format.MappingError(
919 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000920
921 def _inspect_chunk(self, chunk, is_last):
922 """Called for each fetched chunk before passing it to consumer."""
923 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400924 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700925 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000926 (self.expected_size != self.current_size)):
927 raise IOError('Incorrect file size: expected %d, got %d' % (
928 self.expected_size, self.current_size))
929
930
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000931class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800932 """Interface for classes that implement low-level storage operations.
933
934 StorageApi is oblivious of compression and hashing scheme used. This details
935 are handled in higher level Storage class.
936
937 Clients should generally not use StorageApi directly. Storage class is
938 preferred since it implements compression and upload optimizations.
939 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000940
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700941 @property
942 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500943 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700944 raise NotImplementedError()
945
946 @property
947 def namespace(self):
948 """Isolate namespace used by this storage.
949
950 Indirectly defines hashing scheme and compression method used.
951 """
952 raise NotImplementedError()
953
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800954 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000955 """Fetches an object and yields its content.
956
957 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000958 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800959 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000960
961 Yields:
962 Chunks of downloaded item (as str objects).
963 """
964 raise NotImplementedError()
965
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800966 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000967 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000968
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800969 |item| MUST go through 'contains' call to get |push_state| before it can
970 be pushed to the storage.
971
972 To be clear, here is one possible usage:
973 all_items = [... all items to push as Item subclasses ...]
974 for missing_item, push_state in storage_api.contains(all_items).items():
975 storage_api.push(missing_item, push_state)
976
977 When pushing to a namespace with compression, data that should be pushed
978 and data provided by the item is not the same. In that case |content| is
979 not None and it yields chunks of compressed data (using item.content() as
980 a source of original uncompressed data). This is implemented by Storage
981 class.
982
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000983 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000984 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800985 push_state: push state object as returned by 'contains' call.
986 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000987
988 Returns:
989 None.
990 """
991 raise NotImplementedError()
992
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000993 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800994 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000995
996 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800997 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000998
999 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001000 A dict missing Item -> opaque push state object to be passed to 'push'.
1001 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001002 """
1003 raise NotImplementedError()
1004
1005
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001006class _IsolateServerPushState(object):
1007 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -05001008
1009 Note this needs to be a global class to support pickling.
1010 """
1011
Cory Massarocc19c8c2015-03-10 13:35:11 -07001012 def __init__(self, preupload_status, size):
1013 self.preupload_status = preupload_status
1014 gs_upload_url = preupload_status.get('gs_upload_url') or None
1015 if gs_upload_url:
1016 self.upload_url = gs_upload_url
maruel380e3262016-08-31 16:10:06 -07001017 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
Cory Massarocc19c8c2015-03-10 13:35:11 -07001018 else:
maruel380e3262016-08-31 16:10:06 -07001019 self.upload_url = 'api/isolateservice/v1/store_inline'
Cory Massarocc19c8c2015-03-10 13:35:11 -07001020 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -05001021 self.uploaded = False
1022 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001023 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -05001024
1025
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001026class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001027 """StorageApi implementation that downloads and uploads to Isolate Server.
1028
1029 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001030 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001031 """
1032
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001033 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001034 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001035 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001036 self._base_url = base_url.rstrip('/')
1037 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -07001038 self._namespace_dict = {
1039 'compression': 'flate' if namespace.endswith(
1040 ('-gzip', '-flate')) else '',
1041 'digest_hash': 'sha-1',
1042 'namespace': namespace,
1043 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001044 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001045 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001046 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001047
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001048 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001049 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001050 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001051
1052 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001053 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001054 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001055 # TODO(maruel): Make this request much earlier asynchronously while the
1056 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001057
1058 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
1059 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001060
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001061 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001062 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001063 self._server_caps = net.url_read_json(
maruel380e3262016-08-31 16:10:06 -07001064 url='%s/api/isolateservice/v1/server_details' % self._base_url,
Cory Massarocc19c8c2015-03-10 13:35:11 -07001065 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001066 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001067
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001068 @property
1069 def location(self):
1070 return self._base_url
1071
1072 @property
1073 def namespace(self):
1074 return self._namespace
1075
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001076 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001077 assert offset >= 0
maruel380e3262016-08-31 16:10:06 -07001078 source_url = '%s/api/isolateservice/v1/retrieve' % (
Cory Massarocc19c8c2015-03-10 13:35:11 -07001079 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001080 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001081 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001082
Cory Massarocc19c8c2015-03-10 13:35:11 -07001083 if not response:
maruele154f9c2015-09-14 11:03:15 -07001084 raise IOError(
1085 'Attempted to fetch from %s; no data exist: %s / %s.' % (
1086 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001087
Cory Massarocc19c8c2015-03-10 13:35:11 -07001088 # for DB uploads
1089 content = response.get('content')
1090 if content is not None:
maruel863ac262016-03-17 11:00:37 -07001091 yield base64.b64decode(content)
1092 return
Cory Massarocc19c8c2015-03-10 13:35:11 -07001093
1094 # for GS entities
1095 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -07001096 if not connection:
1097 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -07001098
1099 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001100 if offset:
1101 content_range = connection.get_header('Content-Range')
1102 if not content_range:
1103 raise IOError('Missing Content-Range header')
1104
1105 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1106 # According to a spec, <size> can be '*' meaning "Total size of the file
1107 # is not known in advance".
1108 try:
1109 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1110 if not match:
1111 raise ValueError()
1112 content_offset = int(match.group(1))
1113 last_byte_index = int(match.group(2))
1114 size = None if match.group(3) == '*' else int(match.group(3))
1115 except ValueError:
1116 raise IOError('Invalid Content-Range header: %s' % content_range)
1117
1118 # Ensure returned offset equals requested one.
1119 if offset != content_offset:
1120 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1121 offset, content_offset, content_range))
1122
1123 # Ensure entire tail of the file is returned.
1124 if size is not None and last_byte_index + 1 != size:
1125 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1126
maruel863ac262016-03-17 11:00:37 -07001127 for data in connection.iter_content(NET_IO_FILE_CHUNK):
1128 yield data
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001129
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001130 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001131 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001132 assert item.digest is not None
1133 assert item.size is not None
1134 assert isinstance(push_state, _IsolateServerPushState)
1135 assert not push_state.finalized
1136
1137 # Default to item.content().
1138 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001139 logging.info('Push state size: %d', push_state.size)
1140 if isinstance(content, (basestring, list)):
1141 # Memory is already used, too late.
1142 with self._lock:
1143 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001144 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001145 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1146 # If |content| is indeed a generator, it can not be re-winded back to the
1147 # beginning of the stream. A retry will find it exhausted. A possible
1148 # solution is to wrap |content| generator with some sort of caching
1149 # restartable generator. It should be done alongside streaming support
1150 # implementation.
1151 #
1152 # In theory, we should keep the generator, so that it is not serialized in
1153 # memory. Sadly net.HttpService.request() requires the body to be
1154 # serialized.
1155 assert isinstance(content, types.GeneratorType), repr(content)
1156 slept = False
1157 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001158 # One byte less than 512mb. This is to cope with incompressible content.
1159 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001160 while True:
1161 with self._lock:
1162 # This is due to 32 bits python when uploading very large files. The
1163 # problem is that it's comparing uncompressed sizes, while we care
1164 # about compressed sizes since it's what is serialized in memory.
1165 # The first check assumes large files are compressible and that by
1166 # throttling one upload at once, we can survive. Otherwise, kaboom.
1167 memory_use = self._memory_use
1168 if ((push_state.size >= max_size and not memory_use) or
1169 (memory_use + push_state.size <= max_size)):
1170 self._memory_use += push_state.size
1171 memory_use = self._memory_use
1172 break
1173 time.sleep(0.1)
1174 slept = True
1175 if slept:
1176 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001177
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001178 try:
1179 # This push operation may be a retry after failed finalization call below,
1180 # no need to reupload contents in that case.
1181 if not push_state.uploaded:
1182 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001183 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001184 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001185 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001186 item.digest, push_state.upload_url))
1187 push_state.uploaded = True
1188 else:
1189 logging.info(
1190 'A file %s already uploaded, retrying finalization only',
1191 item.digest)
1192
1193 # Optionally notify the server that it's done.
1194 if push_state.finalize_url:
1195 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1196 # send it to isolated server. That way isolate server can verify that
1197 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1198 # stored files).
1199 # TODO(maruel): Fix the server to accept properly data={} so
1200 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001201 response = net.url_read_json(
1202 url='%s/%s' % (self._base_url, push_state.finalize_url),
1203 data={
1204 'upload_ticket': push_state.preupload_status['upload_ticket'],
1205 })
1206 if not response or not response['ok']:
1207 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001208 push_state.finalized = True
1209 finally:
1210 with self._lock:
1211 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001212
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001213 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001214 # Ensure all items were initialized with 'prepare' call. Storage does that.
1215 assert all(i.digest is not None and i.size is not None for i in items)
1216
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001217 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001218 body = {
1219 'items': [
1220 {
1221 'digest': item.digest,
1222 'is_isolated': bool(item.high_priority),
1223 'size': item.size,
1224 } for item in items
1225 ],
1226 'namespace': self._namespace_dict,
1227 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001228
maruel380e3262016-08-31 16:10:06 -07001229 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001230
1231 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001232 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001233 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001234 response = net.url_read_json(url=query_url, data=body)
1235 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001236 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001237 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001238 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001239 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001240 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001241
1242 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001243 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001244 for preupload_status in response.get('items', []):
1245 assert 'upload_ticket' in preupload_status, (
1246 preupload_status, '/preupload did not generate an upload ticket')
1247 index = int(preupload_status['index'])
1248 missing_items[items[index]] = _IsolateServerPushState(
1249 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001250 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001251 len(items), len(items) - len(missing_items))
1252 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001253
Cory Massarocc19c8c2015-03-10 13:35:11 -07001254 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001255 """Fetches isolated data from the URL.
1256
1257 Used only for fetching files, not for API calls. Can be overridden in
1258 subclasses.
1259
1260 Args:
1261 url: URL to fetch the data from, can possibly return http redirect.
1262 offset: byte offset inside the file to start fetching from.
1263
1264 Returns:
1265 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1266 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001267 assert isinstance(offset, int)
1268 data = {
1269 'digest': digest.encode('utf-8'),
1270 'namespace': self._namespace_dict,
1271 'offset': offset,
1272 }
maruel0c25f4f2015-12-15 05:41:17 -08001273 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1274 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001275 return net.url_read_json(
1276 url=url,
1277 data=data,
1278 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001279
Cory Massarocc19c8c2015-03-10 13:35:11 -07001280 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001281 """Uploads isolated file to the URL.
1282
1283 Used only for storing files, not for API calls. Can be overridden in
1284 subclasses.
1285
1286 Args:
1287 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001288 push_state: an _IsolateServicePushState instance
1289 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001290 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001291 """
1292 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1293 # upload support is implemented.
1294 if isinstance(content, list) and len(content) == 1:
1295 content = content[0]
1296 else:
1297 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001298
1299 # DB upload
1300 if not push_state.finalize_url:
1301 url = '%s/%s' % (self._base_url, push_state.upload_url)
1302 content = base64.b64encode(content)
1303 data = {
1304 'upload_ticket': push_state.preupload_status['upload_ticket'],
1305 'content': content,
1306 }
1307 response = net.url_read_json(url=url, data=data)
1308 return response is not None and response['ok']
1309
1310 # upload to GS
1311 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001312 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001313 content_type='application/octet-stream',
1314 data=content,
1315 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001316 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001317 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001318 return response is not None
1319
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001320
nodir445097b2016-06-03 22:50:26 -07001321class CacheMiss(Exception):
1322 """Raised when an item is not in cache."""
1323
1324 def __init__(self, digest):
1325 self.digest = digest
1326 super(CacheMiss, self).__init__(
1327 'Item with digest %r is not found in cache' % digest)
1328
1329
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001330class LocalCache(object):
1331 """Local cache that stores objects fetched via Storage.
1332
1333 It can be accessed concurrently from multiple threads, so it should protect
1334 its internal state with some lock.
1335 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001336 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001337
maruel064c0a32016-04-05 11:47:15 -07001338 def __init__(self):
1339 self._lock = threading_utils.LockWithAssert()
1340 # Profiling values.
1341 self._added = []
1342 self._initial_number_items = 0
1343 self._initial_size = 0
1344 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -07001345 self._used = []
maruel064c0a32016-04-05 11:47:15 -07001346
nodirbe642ff2016-06-09 15:51:51 -07001347 def __contains__(self, digest):
1348 raise NotImplementedError()
1349
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001350 def __enter__(self):
1351 """Context manager interface."""
1352 return self
1353
1354 def __exit__(self, _exc_type, _exec_value, _traceback):
1355 """Context manager interface."""
1356 return False
1357
maruel064c0a32016-04-05 11:47:15 -07001358 @property
1359 def added(self):
1360 return self._added[:]
1361
1362 @property
1363 def evicted(self):
1364 return self._evicted[:]
1365
1366 @property
tansell9e04a8d2016-07-28 09:31:59 -07001367 def used(self):
1368 return self._used[:]
1369
1370 @property
maruel064c0a32016-04-05 11:47:15 -07001371 def initial_number_items(self):
1372 return self._initial_number_items
1373
1374 @property
1375 def initial_size(self):
1376 return self._initial_size
1377
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001378 def cached_set(self):
1379 """Returns a set of all cached digests (always a new object)."""
1380 raise NotImplementedError()
1381
maruel36a963d2016-04-08 17:15:49 -07001382 def cleanup(self):
1383 """Deletes any corrupted item from the cache and trims it if necessary."""
1384 raise NotImplementedError()
1385
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001386 def touch(self, digest, size):
1387 """Ensures item is not corrupted and updates its LRU position.
1388
1389 Arguments:
1390 digest: hash digest of item to check.
1391 size: expected size of this item.
1392
1393 Returns:
1394 True if item is in cache and not corrupted.
1395 """
1396 raise NotImplementedError()
1397
1398 def evict(self, digest):
1399 """Removes item from cache if it's there."""
1400 raise NotImplementedError()
1401
tansell9e04a8d2016-07-28 09:31:59 -07001402 def getfileobj(self, digest):
1403 """Returns a readable file like object.
1404
1405 If file exists on the file system it will have a .name attribute with an
1406 absolute path to the file.
1407 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001408 raise NotImplementedError()
1409
1410 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001411 """Reads data from |content| generator and stores it in cache.
1412
1413 Returns digest to simplify chaining.
1414 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001415 raise NotImplementedError()
1416
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001417
1418class MemoryCache(LocalCache):
1419 """LocalCache implementation that stores everything in memory."""
1420
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001421 def __init__(self, file_mode_mask=0500):
1422 """Args:
1423 file_mode_mask: bit mask to AND file mode with. Default value will make
1424 all mapped files to be read only.
1425 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001426 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001427 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001428 self._contents = {}
1429
nodirbe642ff2016-06-09 15:51:51 -07001430 def __contains__(self, digest):
1431 with self._lock:
1432 return digest in self._contents
1433
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001434 def cached_set(self):
1435 with self._lock:
1436 return set(self._contents)
1437
maruel36a963d2016-04-08 17:15:49 -07001438 def cleanup(self):
1439 pass
1440
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001441 def touch(self, digest, size):
1442 with self._lock:
1443 return digest in self._contents
1444
1445 def evict(self, digest):
1446 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001447 v = self._contents.pop(digest, None)
1448 if v is not None:
1449 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001450
tansell9e04a8d2016-07-28 09:31:59 -07001451 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001452 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001453 try:
tansell9e04a8d2016-07-28 09:31:59 -07001454 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001455 except KeyError:
1456 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001457 self._used.append(len(d))
1458 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001459
1460 def write(self, digest, content):
1461 # Assemble whole stream before taking the lock.
1462 data = ''.join(content)
1463 with self._lock:
1464 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001465 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001466 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001467
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001468
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001469class CachePolicies(object):
1470 def __init__(self, max_cache_size, min_free_space, max_items):
1471 """
1472 Arguments:
1473 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1474 cache is effectively a leak.
1475 - min_free_space: Trim if disk free space becomes lower than this value. If
1476 0, it unconditionally fill the disk.
1477 - max_items: Maximum number of items to keep in the cache. If 0, do not
1478 enforce a limit.
1479 """
1480 self.max_cache_size = max_cache_size
1481 self.min_free_space = min_free_space
1482 self.max_items = max_items
1483
1484
1485class DiskCache(LocalCache):
1486 """Stateful LRU cache in a flat hash table in a directory.
1487
1488 Saves its state as json file.
1489 """
maruel12e30012015-10-09 11:55:35 -07001490 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001491
nodirf33b8d62016-10-26 22:34:58 -07001492 def __init__(self, cache_dir, policies, hash_algo, trim=True):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001493 """
1494 Arguments:
1495 cache_dir: directory where to place the cache.
1496 policies: cache retention policies.
1497 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001498 trim: if True to enforce |policies| right away.
1499 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001500 """
maruel064c0a32016-04-05 11:47:15 -07001501 # All protected methods (starting with '_') except _path should be called
1502 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001503 super(DiskCache, self).__init__()
1504 self.cache_dir = cache_dir
1505 self.policies = policies
1506 self.hash_algo = hash_algo
1507 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001508 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001509 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001510 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001511 file_path.ensure_tree(self.cache_dir)
1512 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001513 # The first item in the LRU cache that must not be evicted during this run
1514 # since it was referenced. All items more recent that _protected in the LRU
1515 # cache are also inherently protected. It could be a set() of all items
1516 # referenced but this increases memory usage without a use case.
1517 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001518 # Cleanup operations done by self._load(), if any.
1519 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001520 with tools.Profiler('Setup'):
1521 with self._lock:
nodirf33b8d62016-10-26 22:34:58 -07001522 self._load(trim=trim)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001523
nodirbe642ff2016-06-09 15:51:51 -07001524 def __contains__(self, digest):
1525 with self._lock:
1526 return digest in self._lru
1527
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001528 def __enter__(self):
1529 return self
1530
1531 def __exit__(self, _exc_type, _exec_value, _traceback):
1532 with tools.Profiler('CleanupTrimming'):
1533 with self._lock:
1534 self._trim()
1535
1536 logging.info(
1537 '%5d (%8dkb) added',
1538 len(self._added), sum(self._added) / 1024)
1539 logging.info(
1540 '%5d (%8dkb) current',
1541 len(self._lru),
1542 sum(self._lru.itervalues()) / 1024)
1543 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001544 '%5d (%8dkb) evicted',
1545 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001546 logging.info(
1547 ' %8dkb free',
1548 self._free_disk / 1024)
1549 return False
1550
1551 def cached_set(self):
1552 with self._lock:
1553 return self._lru.keys_set()
1554
maruel36a963d2016-04-08 17:15:49 -07001555 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001556 """Cleans up the cache directory.
1557
1558 Ensures there is no unknown files in cache_dir.
1559 Ensures the read-only bits are set correctly.
1560
1561 At that point, the cache was already loaded, trimmed to respect cache
1562 policies.
1563 """
1564 fs.chmod(self.cache_dir, 0700)
1565 # Ensure that all files listed in the state still exist and add new ones.
1566 previous = self._lru.keys_set()
1567 # It'd be faster if there were a readdir() function.
1568 for filename in fs.listdir(self.cache_dir):
1569 if filename == self.STATE_FILE:
1570 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1571 continue
1572 if filename in previous:
1573 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1574 previous.remove(filename)
1575 continue
1576
1577 # An untracked file. Delete it.
1578 logging.warning('Removing unknown file %s from cache', filename)
1579 p = self._path(filename)
1580 if fs.isdir(p):
1581 try:
1582 file_path.rmtree(p)
1583 except OSError:
1584 pass
1585 else:
1586 file_path.try_remove(p)
1587 continue
1588
1589 if previous:
1590 # Filter out entries that were not found.
1591 logging.warning('Removed %d lost files', len(previous))
1592 for filename in previous:
1593 self._lru.pop(filename)
maruel36a963d2016-04-08 17:15:49 -07001594
1595 # What remains to be done is to hash every single item to
1596 # detect corruption, then save to ensure state.json is up to date.
1597 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1598 # TODO(maruel): Let's revisit once directory metadata is stored in
1599 # state.json so only the files that had been mapped since the last cleanup()
1600 # call are manually verified.
1601 #
1602 #with self._lock:
1603 # for digest in self._lru:
1604 # if not isolated_format.is_valid_hash(
1605 # self._path(digest), self.hash_algo):
1606 # self.evict(digest)
1607 # logging.info('Deleted corrupted item: %s', digest)
1608
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001609 def touch(self, digest, size):
1610 """Verifies an actual file is valid.
1611
1612 Note that is doesn't compute the hash so it could still be corrupted if the
1613 file size didn't change.
1614
1615 TODO(maruel): More stringent verification while keeping the check fast.
1616 """
1617 # Do the check outside the lock.
1618 if not is_valid_file(self._path(digest), size):
1619 return False
1620
1621 # Update it's LRU position.
1622 with self._lock:
1623 if digest not in self._lru:
1624 return False
1625 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001626 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001627 return True
1628
1629 def evict(self, digest):
1630 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001631 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001632 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001633 self._lru.pop(digest)
1634 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1635
tansell9e04a8d2016-07-28 09:31:59 -07001636 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001637 try:
tansell9e04a8d2016-07-28 09:31:59 -07001638 f = fs.open(self._path(digest), 'rb')
1639 with self._lock:
1640 self._used.append(self._lru[digest])
1641 return f
nodir445097b2016-06-03 22:50:26 -07001642 except IOError:
1643 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001644
1645 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001646 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001647 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001648 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001649 path = self._path(digest)
1650 # A stale broken file may remain. It is possible for the file to have write
1651 # access bit removed which would cause the file_write() call to fail to open
1652 # in write mode. Take no chance here.
1653 file_path.try_remove(path)
1654 try:
1655 size = file_write(path, content)
1656 except:
1657 # There are two possible places were an exception can occur:
1658 # 1) Inside |content| generator in case of network or unzipping errors.
1659 # 2) Inside file_write itself in case of disk IO errors.
1660 # In any case delete an incomplete file and propagate the exception to
1661 # caller, it will be logged there.
1662 file_path.try_remove(path)
1663 raise
1664 # Make the file read-only in the cache. This has a few side-effects since
1665 # the file node is modified, so every directory entries to this file becomes
1666 # read-only. It's fine here because it is a new file.
1667 file_path.set_read_only(path, True)
1668 with self._lock:
1669 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001670 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001671
nodirf33b8d62016-10-26 22:34:58 -07001672 def get_oldest(self):
1673 """Returns digest of the LRU item or None."""
1674 try:
1675 return self._lru.get_oldest()[0]
1676 except KeyError:
1677 return None
1678
1679 def get_timestamp(self, digest):
1680 """Returns timestamp of last use of an item.
1681
1682 Raises KeyError if item is not found.
1683 """
1684 return self._lru.get_timestamp(digest)
1685
1686 def trim(self):
1687 """Forces retention policies."""
1688 with self._lock:
1689 self._trim()
1690
1691 def _load(self, trim):
maruel2e8d0f52016-07-16 07:51:29 -07001692 """Loads state of the cache from json file.
1693
1694 If cache_dir does not exist on disk, it is created.
1695 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001696 self._lock.assert_locked()
1697
maruel2e8d0f52016-07-16 07:51:29 -07001698 if not fs.isfile(self.state_file):
1699 if not os.path.isdir(self.cache_dir):
1700 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001701 else:
maruel2e8d0f52016-07-16 07:51:29 -07001702 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001703 try:
1704 self._lru = lru.LRUDict.load(self.state_file)
1705 except ValueError as err:
1706 logging.error('Failed to load cache state: %s' % (err,))
1707 # Don't want to keep broken state file.
1708 file_path.try_remove(self.state_file)
nodirf33b8d62016-10-26 22:34:58 -07001709 if trim:
1710 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001711 # We want the initial cache size after trimming, i.e. what is readily
1712 # avaiable.
1713 self._initial_number_items = len(self._lru)
1714 self._initial_size = sum(self._lru.itervalues())
1715 if self._evicted:
1716 logging.info(
1717 'Trimming evicted items with the following sizes: %s',
1718 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001719
1720 def _save(self):
1721 """Saves the LRU ordering."""
1722 self._lock.assert_locked()
1723 if sys.platform != 'win32':
1724 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001725 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001726 # Necessary otherwise the file can't be created.
1727 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001728 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001729 file_path.set_read_only(self.state_file, False)
1730 self._lru.save(self.state_file)
1731
1732 def _trim(self):
1733 """Trims anything we don't know, make sure enough free space exists."""
1734 self._lock.assert_locked()
1735
1736 # Ensure maximum cache size.
1737 if self.policies.max_cache_size:
1738 total_size = sum(self._lru.itervalues())
1739 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001740 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001741
1742 # Ensure maximum number of items in the cache.
1743 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1744 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001745 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001746
1747 # Ensure enough free space.
1748 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001749 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001750 while (
1751 self.policies.min_free_space and
1752 self._lru and
1753 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001754 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001755 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001756
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001757 if trimmed_due_to_space:
1758 total_usage = sum(self._lru.itervalues())
1759 usage_percent = 0.
1760 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001761 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1762
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001763 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001764 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1765 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1766 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001767 self._free_disk / 1024.,
1768 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001769 usage_percent,
1770 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001771 self._save()
1772
1773 def _path(self, digest):
1774 """Returns the path to one item."""
1775 return os.path.join(self.cache_dir, digest)
1776
maruel2e8d0f52016-07-16 07:51:29 -07001777 def _remove_lru_file(self, allow_protected):
1778 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001779 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001780 try:
nodireabc11c2016-10-18 16:37:28 -07001781 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001782 if not allow_protected and digest == self._protected:
1783 raise Error('Not enough space to map the whole isolated tree')
maruel083fa552016-04-08 14:38:01 -07001784 except KeyError:
1785 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001786 digest, (size, _) = self._lru.pop_oldest()
kjlubickea9abf02016-06-01 09:34:33 -07001787 logging.debug("Removing LRU file %s", digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001788 self._delete_file(digest, size)
1789 return size
1790
1791 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1792 """Adds an item into LRU cache marking it as a newest one."""
1793 self._lock.assert_locked()
1794 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001795 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001796 self._added.append(size)
1797 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001798 self._free_disk -= size
1799 # Do a quicker version of self._trim(). It only enforces free disk space,
1800 # not cache size limits. It doesn't actually look at real free disk space,
1801 # only uses its cache values. self._trim() will be called later to enforce
1802 # real trimming but doing this quick version here makes it possible to map
1803 # an isolated that is larger than the current amount of free disk space when
1804 # the cache size is already large.
1805 while (
1806 self.policies.min_free_space and
1807 self._lru and
1808 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001809 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001810
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001811 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1812 """Deletes cache file from the file system."""
1813 self._lock.assert_locked()
1814 try:
1815 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001816 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001817 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001818 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001819 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001820 except OSError as e:
1821 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1822
1823
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001824class IsolatedBundle(object):
1825 """Fetched and parsed .isolated file with all dependencies."""
1826
Vadim Shtayura3148e072014-09-02 18:51:52 -07001827 def __init__(self):
1828 self.command = []
1829 self.files = {}
1830 self.read_only = None
1831 self.relative_cwd = None
1832 # The main .isolated file, a IsolatedFile instance.
1833 self.root = None
1834
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001835 def fetch(self, fetch_queue, root_isolated_hash, algo):
1836 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001837
1838 It enables support for "included" .isolated files. They are processed in
1839 strict order but fetched asynchronously from the cache. This is important so
1840 that a file in an included .isolated file that is overridden by an embedding
1841 .isolated file is not fetched needlessly. The includes are fetched in one
1842 pass and the files are fetched as soon as all the ones on the left-side
1843 of the tree were fetched.
1844
1845 The prioritization is very important here for nested .isolated files.
1846 'includes' have the highest priority and the algorithm is optimized for both
1847 deep and wide trees. A deep one is a long link of .isolated files referenced
1848 one at a time by one item in 'includes'. A wide one has a large number of
1849 'includes' in a single .isolated file. 'left' is defined as an included
1850 .isolated file earlier in the 'includes' list. So the order of the elements
1851 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001852
1853 As a side effect this method starts asynchronous fetch of all data files
1854 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1855 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001856 """
1857 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1858
1859 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1860 pending = {}
1861 # Set of hashes of already retrieved items to refuse recursive includes.
1862 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001863 # Set of IsolatedFile's whose data files have already being fetched.
1864 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001865
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001866 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001867 h = isolated_file.obj_hash
1868 if h in seen:
1869 raise isolated_format.IsolatedError(
1870 'IsolatedFile %s is retrieved recursively' % h)
1871 assert h not in pending
1872 seen.add(h)
1873 pending[h] = isolated_file
1874 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1875
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001876 # Start fetching root *.isolated file (single file, not the whole bundle).
1877 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001878
1879 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001880 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001881 item_hash = fetch_queue.wait(pending)
1882 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001883 with fetch_queue.cache.getfileobj(item_hash) as f:
1884 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001885
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001886 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001887 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001888 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001889
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001890 # Always fetch *.isolated files in traversal order, waiting if necessary
1891 # until next to-be-processed node loads. "Waiting" is done by yielding
1892 # back to the outer loop, that waits until some *.isolated is loaded.
1893 for node in isolated_format.walk_includes(self.root):
1894 if node not in processed:
1895 # Not visited, and not yet loaded -> wait for it to load.
1896 if not node.is_loaded:
1897 break
1898 # Not visited and loaded -> process it and continue the traversal.
1899 self._start_fetching_files(node, fetch_queue)
1900 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001901
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001902 # All *.isolated files should be processed by now and only them.
1903 all_isolateds = set(isolated_format.walk_includes(self.root))
1904 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001905
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001906 # Extract 'command' and other bundle properties.
1907 for node in isolated_format.walk_includes(self.root):
1908 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001909 self.relative_cwd = self.relative_cwd or ''
1910
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001911 def _start_fetching_files(self, isolated, fetch_queue):
1912 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001913
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001914 Modifies self.files.
1915 """
1916 logging.debug('fetch_files(%s)', isolated.obj_hash)
1917 for filepath, properties in isolated.data.get('files', {}).iteritems():
1918 # Root isolated has priority on the files being mapped. In particular,
1919 # overridden files must not be fetched.
1920 if filepath not in self.files:
1921 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001922
1923 # Make sure if the isolated is read only, the mode doesn't have write
1924 # bits.
1925 if 'm' in properties and self.read_only:
1926 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1927
1928 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001929 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001930 logging.debug('fetching %s', filepath)
1931 fetch_queue.add(
1932 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1933
1934 def _update_self(self, node):
1935 """Extracts bundle global parameters from loaded *.isolated file.
1936
1937 Will be called with each loaded *.isolated file in order of traversal of
1938 isolated include graph (see isolated_format.walk_includes).
1939 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001940 # Grabs properties.
1941 if not self.command and node.data.get('command'):
1942 # Ensure paths are correctly separated on windows.
1943 self.command = node.data['command']
1944 if self.command:
1945 self.command[0] = self.command[0].replace('/', os.path.sep)
1946 self.command = tools.fix_python_path(self.command)
1947 if self.read_only is None and node.data.get('read_only') is not None:
1948 self.read_only = node.data['read_only']
1949 if (self.relative_cwd is None and
1950 node.data.get('relative_cwd') is not None):
1951 self.relative_cwd = node.data['relative_cwd']
1952
1953
Vadim Shtayura8623c272014-12-01 11:45:27 -08001954def set_storage_api_class(cls):
1955 """Replaces StorageApi implementation used by default."""
1956 global _storage_api_cls
1957 assert _storage_api_cls is None
1958 assert issubclass(cls, StorageApi)
1959 _storage_api_cls = cls
1960
1961
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001962def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001963 """Returns an object that implements low-level StorageApi interface.
1964
1965 It is used by Storage to work with single isolate |namespace|. It should
1966 rarely be used directly by clients, see 'get_storage' for
1967 a better alternative.
1968
1969 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001970 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001971 namespace: isolate namespace to operate in, also defines hashing and
1972 compression scheme used, i.e. namespace names that end with '-gzip'
1973 store compressed data.
1974
1975 Returns:
1976 Instance of StorageApi subclass.
1977 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001978 cls = _storage_api_cls or IsolateServer
1979 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001980
1981
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001982def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001983 """Returns Storage class that can upload and download from |namespace|.
1984
1985 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001986 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001987 namespace: isolate namespace to operate in, also defines hashing and
1988 compression scheme used, i.e. namespace names that end with '-gzip'
1989 store compressed data.
1990
1991 Returns:
1992 Instance of Storage.
1993 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001994 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001995
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001996
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001997def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001998 """Uploads the given tree to the given url.
1999
2000 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002001 base_url: The url of the isolate server to upload to.
2002 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00002003 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002004 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002005 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002006 # Filter out symlinks, since they are not represented by items on isolate
2007 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002008 items = []
2009 seen = set()
2010 skipped = 0
2011 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07002012 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002013 if 'l' not in metadata and filepath not in seen:
2014 seen.add(filepath)
2015 item = FileItem(
2016 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002017 digest=metadata['h'],
2018 size=metadata['s'],
2019 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002020 items.append(item)
2021 else:
2022 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002023
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002024 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002025 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002026 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07002027
2028
maruel4409e302016-07-19 14:25:51 -07002029def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002030 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002031
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002032 Arguments:
2033 isolated_hash: hash of the root *.isolated file.
2034 storage: Storage class that communicates with isolate storage.
2035 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002036 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07002037 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002038
2039 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002040 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002041 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002042 logging.debug(
maruel4409e302016-07-19 14:25:51 -07002043 'fetch_isolated(%s, %s, %s, %s, %s)',
2044 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002045 # Hash algorithm to use, defined by namespace |storage| is using.
2046 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002047 with cache:
2048 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002049 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002050
2051 with tools.Profiler('GetIsolateds'):
2052 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002053 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002054 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07002055 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002056 try:
maruel1ceb3872015-10-14 06:10:44 -07002057 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002058 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002059 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002060 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
2061 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002062
2063 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002064 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002065
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002066 with tools.Profiler('GetRest'):
2067 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07002068 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002069 create_directories(outdir, bundle.files)
2070 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002071
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002072 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002073 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07002074 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002075
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002076 # Multimap: digest -> list of pairs (path, props).
2077 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002078 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002079 if 'h' in props:
2080 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002081
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002082 # Now block on the remaining files to be downloaded and mapped.
2083 logging.info('Retrieving remaining files (%d of them)...',
2084 fetch_queue.pending_count)
2085 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07002086 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002087 while remaining:
2088 detector.ping()
2089
2090 # Wait for any item to finish fetching to cache.
2091 digest = fetch_queue.wait(remaining)
2092
tansell9e04a8d2016-07-28 09:31:59 -07002093 # Create the files in the destination using item in cache as the
2094 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002095 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07002096 fullpath = os.path.join(outdir, filepath)
2097
2098 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07002099 filetype = props.get('t', 'basic')
2100
2101 if filetype == 'basic':
2102 file_mode = props.get('m')
2103 if file_mode:
2104 # Ignore all bits apart from the user
2105 file_mode &= 0700
2106 putfile(
2107 srcfileobj, fullpath, file_mode,
2108 use_symlink=use_symlinks)
2109
2110 elif filetype == 'ar':
2111 basedir = os.path.dirname(fullpath)
2112 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
2113 for ai, ifd in extractor:
2114 fp = os.path.normpath(os.path.join(basedir, ai.name))
2115 file_path.ensure_tree(os.path.dirname(fp))
2116 putfile(ifd, fp, 0700, ai.size)
2117
2118 else:
2119 raise isolated_format.IsolatedError(
2120 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002121
2122 # Report progress.
2123 duration = time.time() - last_update
2124 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
2125 msg = '%d files remaining...' % len(remaining)
2126 print msg
2127 logging.info(msg)
2128 last_update = time.time()
2129
2130 # Cache could evict some items we just tried to fetch, it's a fatal error.
2131 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002132 raise isolated_format.MappingError(
2133 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002134 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002135
2136
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002137def directory_to_metadata(root, algo, blacklist):
2138 """Returns the FileItem list and .isolated metadata for a directory."""
2139 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002140 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07002141 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002142 metadata = {
2143 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05002144 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002145 for relpath in paths
2146 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002147 for v in metadata.itervalues():
2148 v.pop('t')
2149 items = [
2150 FileItem(
2151 path=os.path.join(root, relpath),
2152 digest=meta['h'],
2153 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002154 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002155 for relpath, meta in metadata.iteritems() if 'h' in meta
2156 ]
2157 return items, metadata
2158
2159
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002160def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002161 """Stores every entries and returns the relevant data.
2162
2163 Arguments:
2164 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002165 files: list of file paths to upload. If a directory is specified, a
2166 .isolated file is created and its hash is returned.
2167 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07002168
2169 Returns:
2170 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
2171 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002172 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002173 assert all(isinstance(i, unicode) for i in files), files
2174 if len(files) != len(set(map(os.path.abspath, files))):
2175 raise Error('Duplicate entries found.')
2176
maruel064c0a32016-04-05 11:47:15 -07002177 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002178 results = []
2179 # The temporary directory is only created as needed.
2180 tempdir = None
2181 try:
2182 # TODO(maruel): Yield the files to a worker thread.
2183 items_to_upload = []
2184 for f in files:
2185 try:
2186 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07002187 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002188 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002189 items, metadata = directory_to_metadata(
2190 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002191
2192 # Create the .isolated file.
2193 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002194 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
2195 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002196 os.close(handle)
2197 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002198 'algo':
2199 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002200 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002201 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002202 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002203 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002204 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002205 items_to_upload.extend(items)
2206 items_to_upload.append(
2207 FileItem(
2208 path=isolated,
2209 digest=h,
maruel12e30012015-10-09 11:55:35 -07002210 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002211 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002212 results.append((h, f))
2213
maruel12e30012015-10-09 11:55:35 -07002214 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002215 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002216 items_to_upload.append(
2217 FileItem(
2218 path=filepath,
2219 digest=h,
maruel12e30012015-10-09 11:55:35 -07002220 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002221 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002222 results.append((h, f))
2223 else:
2224 raise Error('%s is neither a file or directory.' % f)
2225 except OSError:
2226 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07002227 uploaded = storage.upload_items(items_to_upload)
2228 cold = [i for i in items_to_upload if i in uploaded]
2229 hot = [i for i in items_to_upload if i not in uploaded]
2230 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002231 finally:
maruel12e30012015-10-09 11:55:35 -07002232 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002233 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002234
2235
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002236def archive(out, namespace, files, blacklist):
2237 if files == ['-']:
2238 files = sys.stdin.readlines()
2239
2240 if not files:
2241 raise Error('Nothing to upload')
2242
2243 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002244 blacklist = tools.gen_blacklist(blacklist)
2245 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002246 # Ignore stats.
2247 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002248 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2249
2250
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002251@subcommand.usage('<file1..fileN> or - to read from stdin')
2252def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002253 """Archives data to the server.
2254
2255 If a directory is specified, a .isolated file is created the whole directory
2256 is uploaded. Then this .isolated file can be included in another one to run
2257 commands.
2258
2259 The commands output each file that was processed with its content hash. For
2260 directories, the .isolated generated for the directory is listed as the
2261 directory entry itself.
2262 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002263 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002264 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002265 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07002266 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002267 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002268 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002269 except Error as e:
2270 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002271 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002272
2273
2274def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002275 """Download data from the server.
2276
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002277 It can either download individual files or a complete tree from a .isolated
2278 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002279 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002280 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002281 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002282 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002283 help='hash of an isolated file, .isolated file content is discarded, use '
2284 '--file if you need it')
2285 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002286 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2287 help='hash and destination of a file, can be used multiple times')
2288 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002289 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002290 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07002291 parser.add_option(
2292 '--use-symlinks', action='store_true',
2293 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002294 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002295 options, args = parser.parse_args(args)
2296 if args:
2297 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002298
nodir55be77b2016-05-03 09:39:57 -07002299 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002300 if bool(options.isolated) == bool(options.file):
2301 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07002302 if not options.cache and options.use_symlinks:
2303 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002304
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002305 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07002306 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07002307 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002308 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002309 if (fs.isfile(options.target) or
2310 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002311 parser.error(
2312 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002313 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002314 # Fetching individual files.
2315 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002316 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002317 channel = threading_utils.TaskChannel()
2318 pending = {}
2319 for digest, dest in options.file:
2320 pending[digest] = dest
2321 storage.async_fetch(
2322 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002323 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002324 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002325 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002326 functools.partial(file_write, os.path.join(options.target, dest)))
2327 while pending:
2328 fetched = channel.pull()
2329 dest = pending.pop(fetched)
2330 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002331
Vadim Shtayura3172be52013-12-03 12:49:05 -08002332 # Fetching whole isolated tree.
2333 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002334 with cache:
2335 bundle = fetch_isolated(
2336 isolated_hash=options.isolated,
2337 storage=storage,
2338 cache=cache,
maruel4409e302016-07-19 14:25:51 -07002339 outdir=options.target,
2340 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002341 if bundle.command:
2342 rel = os.path.join(options.target, bundle.relative_cwd)
2343 print('To run this test please run from the directory %s:' %
2344 os.path.join(options.target, rel))
2345 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002346
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002347 return 0
2348
2349
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002350def add_archive_options(parser):
2351 parser.add_option(
2352 '--blacklist',
2353 action='append', default=list(DEFAULT_BLACKLIST),
2354 help='List of regexp to use as blacklist filter when uploading '
2355 'directories')
2356
2357
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002358def add_isolate_server_options(parser):
2359 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002360 parser.add_option(
2361 '-I', '--isolate-server',
2362 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002363 help='URL of the Isolate Server to use. Defaults to the environment '
2364 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2365 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002366 parser.add_option(
2367 '--namespace', default='default-gzip',
2368 help='The namespace to use on the Isolate Server, default: %default')
2369
2370
nodir55be77b2016-05-03 09:39:57 -07002371def process_isolate_server_options(
2372 parser, options, set_exception_handler, required):
2373 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002374
2375 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002376 """
2377 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002378 if required:
2379 parser.error('--isolate-server is required.')
2380 return
2381
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002382 try:
2383 options.isolate_server = net.fix_url(options.isolate_server)
2384 except ValueError as e:
2385 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002386 if set_exception_handler:
2387 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002388 try:
2389 return auth.ensure_logged_in(options.isolate_server)
2390 except ValueError as e:
2391 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002392
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002393
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002394def add_cache_options(parser):
2395 cache_group = optparse.OptionGroup(parser, 'Cache management')
2396 cache_group.add_option(
2397 '--cache', metavar='DIR',
2398 help='Directory to keep a local cache of the files. Accelerates download '
2399 'by reusing already downloaded files. Default=%default')
2400 cache_group.add_option(
2401 '--max-cache-size',
2402 type='int',
2403 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002404 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002405 help='Trim if the cache gets larger than this value, default=%default')
2406 cache_group.add_option(
2407 '--min-free-space',
2408 type='int',
2409 metavar='NNN',
2410 default=2*1024*1024*1024,
2411 help='Trim if disk free space becomes lower than this value, '
2412 'default=%default')
2413 cache_group.add_option(
2414 '--max-items',
2415 type='int',
2416 metavar='NNN',
2417 default=100000,
2418 help='Trim if more than this number of items are in the cache '
2419 'default=%default')
2420 parser.add_option_group(cache_group)
2421
2422
nodirf33b8d62016-10-26 22:34:58 -07002423def process_cache_options(options, trim=True):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002424 if options.cache:
2425 policies = CachePolicies(
2426 options.max_cache_size, options.min_free_space, options.max_items)
2427
2428 # |options.cache| path may not exist until DiskCache() instance is created.
2429 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002430 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002431 policies,
nodirf33b8d62016-10-26 22:34:58 -07002432 isolated_format.get_hash_algo(options.namespace),
2433 trim=trim)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002434 else:
2435 return MemoryCache()
2436
2437
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002438class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002439 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002440 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002441 self,
2442 version=__version__,
2443 prog=os.path.basename(sys.modules[__name__].__file__),
2444 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002445 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002446
2447 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002448 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002449 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002450 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002451 return options, args
2452
2453
2454def main(args):
2455 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002456 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002457
2458
2459if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002460 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002461 fix_encoding.fix_encoding()
2462 tools.disable_buffering()
2463 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002464 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002465 sys.exit(main(sys.argv[1:]))