blob: 1c04303a334a75e1cf994bd67823f5a5ec7535bb [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
tansell26de79e2016-11-13 18:41:11 -08008__version__ = '0.7.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
13import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040015import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
tansell26de79e2016-11-13 18:41:11 -080021import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050022import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000023import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050025import types
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000026import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000027
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028from third_party import colorama
29from third_party.depot_tools import fix_encoding
30from third_party.depot_tools import subcommand
31
tanselle4288c32016-07-28 09:45:40 -070032from libs import arfile
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050033from utils import file_path
maruel12e30012015-10-09 11:55:35 -070034from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040035from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040036from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000037from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040038from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070039from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000040from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000041from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080043import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040044import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080045
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000046
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000047# Version of isolate protocol passed to the server in /handshake request.
48ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000049
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000050
Vadim Shtayura3148e072014-09-02 18:51:52 -070051# The file size to be used when we don't know the correct file size,
52# generally used for .isolated files.
53UNKNOWN_FILE_SIZE = None
54
55
56# Maximum expected delay (in seconds) between successive file fetches or uploads
57# in Storage. If it takes longer than that, a deadlock might be happening
58# and all stack frames for all threads are dumped to log.
59DEADLOCK_TIMEOUT = 5 * 60
60
61
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# All files are sorted by likelihood of a change in the file content
64# (currently file size is used to estimate this: larger the file -> larger the
65# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000066# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000067# and so on. Numbers here is a trade-off; the more per request, the lower the
68# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
69# larger values cause longer lookups, increasing the initial latency to start
70# uploading, which is especially an issue for large files. This value is
71# optimized for the "few thousands files to look up with minimal number of large
72# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040073ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000074
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000075
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000076# A list of already compressed extension types that should not receive any
77# compression before being uploaded.
78ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040079 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
80 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000081]
82
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000083
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000084# Chunk size to use when reading from network stream.
85NET_IO_FILE_CHUNK = 16 * 1024
86
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000087
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000088# Read timeout in seconds for downloads from isolate storage. If there's no
89# response from the server within this timeout whole download will be aborted.
90DOWNLOAD_READ_TIMEOUT = 60
91
92
maruel@chromium.org41601642013-09-18 19:40:46 +000093# The delay (in seconds) to wait between logging statements when retrieving
94# the required files. This is intended to let the user (or buildbot) know that
95# the program is still running.
96DELAY_BETWEEN_UPDATES_IN_SECS = 30
97
98
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050099DEFAULT_BLACKLIST = (
100 # Temporary vim or python files.
101 r'^.+\.(?:pyc|swp)$',
102 # .git or .svn directory.
103 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
104)
105
106
Vadim Shtayura8623c272014-12-01 11:45:27 -0800107# A class to use to communicate with the server by default. Can be changed by
108# 'set_storage_api_class'. Default is IsolateServer.
109_storage_api_cls = None
110
111
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500112class Error(Exception):
113 """Generic runtime error."""
114 pass
115
116
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400117class Aborted(Error):
118 """Operation aborted."""
119 pass
120
121
nodir90bc8dc2016-06-15 13:35:21 -0700122class AlreadyExists(Error):
123 """File already exists."""
124
125
maruel12e30012015-10-09 11:55:35 -0700126def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800127 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700128 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800129 if offset:
130 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000131 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000132 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 if not data:
134 break
135 yield data
136
137
maruel12e30012015-10-09 11:55:35 -0700138def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000139 """Writes file content as generated by content_generator.
140
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000141 Creates the intermediary directory as needed.
142
143 Returns the number of bytes written.
144
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000145 Meant to be mocked out in unit tests.
146 """
nodire5028a92016-04-29 14:38:21 -0700147 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000148 total = 0
maruel12e30012015-10-09 11:55:35 -0700149 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000150 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000152 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000153 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000154
155
tansell9e04a8d2016-07-28 09:31:59 -0700156def fileobj_path(fileobj):
157 """Return file system path for file like object or None.
158
159 The returned path is guaranteed to exist and can be passed to file system
160 operations like copy.
161 """
162 name = getattr(fileobj, 'name', None)
163 if name is None:
164 return
165
166 # If the file like object was created using something like open("test.txt")
167 # name will end up being a str (such as a function outside our control, like
168 # the standard library). We want all our paths to be unicode objects, so we
169 # decode it.
170 if not isinstance(name, unicode):
171 name = name.decode(sys.getfilesystemencoding())
172
tansell26de79e2016-11-13 18:41:11 -0800173 # fs.exists requires an absolute path, otherwise it will fail with an
174 # assertion error.
175 if not os.path.isabs(name):
176 return
177
tansell9e04a8d2016-07-28 09:31:59 -0700178 if fs.exists(name):
179 return name
180
181
182# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
183# wrappers have been created.
184def fileobj_copy(
185 dstfileobj, srcfileobj, size=-1,
186 chunk_size=isolated_format.DISK_FILE_CHUNK):
187 """Copy data from srcfileobj to dstfileobj.
188
189 Providing size means exactly that amount of data will be copied (if there
190 isn't enough data, an IOError exception is thrown). Otherwise all data until
191 the EOF marker will be copied.
192 """
193 if size == -1 and hasattr(srcfileobj, 'tell'):
194 if srcfileobj.tell() != 0:
195 raise IOError('partial file but not using size')
196
197 written = 0
198 while written != size:
199 readsize = chunk_size
200 if size > 0:
201 readsize = min(readsize, size-written)
202 data = srcfileobj.read(readsize)
203 if not data:
204 if size == -1:
205 break
206 raise IOError('partial file, got %s, wanted %s' % (written, size))
207 dstfileobj.write(data)
208 written += len(data)
209
210
211def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
212 """Put srcfileobj at the given dstpath with given mode.
213
214 The function aims to do this as efficiently as possible while still allowing
215 any possible file like object be given.
216
217 Creating a tree of hardlinks has a few drawbacks:
218 - tmpfs cannot be used for the scratch space. The tree has to be on the same
219 partition as the cache.
220 - involves a write to the inode, which advances ctime, cause a metadata
221 writeback (causing disk seeking).
222 - cache ctime cannot be used to detect modifications / corruption.
223 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
224 partition. This is why the function automatically fallbacks to copying the
225 file content.
226 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
227 same owner is for all hardlinks.
228 - Anecdotal report that ext2 is known to be potentially faulty on high rate
229 of hardlink creation.
230
231 Creating a tree of symlinks has a few drawbacks:
232 - Tasks running the equivalent of os.path.realpath() will get the naked path
233 and may fail.
234 - Windows:
235 - Symlinks are reparse points:
236 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
237 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
238 - Symbolic links are Win32 paths, not NT paths.
239 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
240 - Symbolic links are supported on Windows 7 and later only.
241 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
242 default.
243 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
244 RID is present in the token;
245 https://msdn.microsoft.com/en-us/library/bb530410.aspx
246 """
247 srcpath = fileobj_path(srcfileobj)
248 if srcpath and size == -1:
249 readonly = file_mode is None or (
250 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
251
252 if readonly:
253 # If the file is read only we can link the file
254 if use_symlink:
255 link_mode = file_path.SYMLINK_WITH_FALLBACK
256 else:
257 link_mode = file_path.HARDLINK_WITH_FALLBACK
258 else:
259 # If not read only, we must copy the file
260 link_mode = file_path.COPY
261
262 file_path.link_file(dstpath, srcpath, link_mode)
263 else:
264 # Need to write out the file
265 with fs.open(dstpath, 'wb') as dstfileobj:
266 fileobj_copy(dstfileobj, srcfileobj, size)
267
268 assert fs.exists(dstpath)
269
270 # file_mode of 0 is actually valid, so need explicit check.
271 if file_mode is not None:
272 fs.chmod(dstpath, file_mode)
273
274
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000275def zip_compress(content_generator, level=7):
276 """Reads chunks from |content_generator| and yields zip compressed chunks."""
277 compressor = zlib.compressobj(level)
278 for chunk in content_generator:
279 compressed = compressor.compress(chunk)
280 if compressed:
281 yield compressed
282 tail = compressor.flush(zlib.Z_FINISH)
283 if tail:
284 yield tail
285
286
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400287def zip_decompress(
288 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000289 """Reads zipped data from |content_generator| and yields decompressed data.
290
291 Decompresses data in small chunks (no larger than |chunk_size|) so that
292 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
293
294 Raises IOError if data is corrupted or incomplete.
295 """
296 decompressor = zlib.decompressobj()
297 compressed_size = 0
298 try:
299 for chunk in content_generator:
300 compressed_size += len(chunk)
301 data = decompressor.decompress(chunk, chunk_size)
302 if data:
303 yield data
304 while decompressor.unconsumed_tail:
305 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
306 if data:
307 yield data
308 tail = decompressor.flush()
309 if tail:
310 yield tail
311 except zlib.error as e:
312 raise IOError(
313 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
314 # Ensure all data was read and decompressed.
315 if decompressor.unused_data or decompressor.unconsumed_tail:
316 raise IOError('Not all data was decompressed')
317
318
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000319def get_zip_compression_level(filename):
320 """Given a filename calculates the ideal zip compression level to use."""
321 file_ext = os.path.splitext(filename)[1].lower()
322 # TODO(csharp): Profile to find what compression level works best.
323 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
324
325
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000326def create_directories(base_directory, files):
327 """Creates the directory structure needed by the given list of files."""
328 logging.debug('create_directories(%s, %d)', base_directory, len(files))
329 # Creates the tree of directories to create.
330 directories = set(os.path.dirname(f) for f in files)
331 for item in list(directories):
332 while item:
333 directories.add(item)
334 item = os.path.dirname(item)
335 for d in sorted(directories):
336 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700337 abs_d = os.path.join(base_directory, d)
338 if not fs.isdir(abs_d):
339 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000340
341
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500342def create_symlinks(base_directory, files):
343 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000344 for filepath, properties in files:
345 if 'l' not in properties:
346 continue
347 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500348 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000349 logging.warning('Ignoring symlink %s', filepath)
350 continue
351 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700352 try:
353 os.symlink(properties['l'], outfile) # pylint: disable=E1101
354 except OSError as e:
355 if e.errno == errno.EEXIST:
356 raise AlreadyExists('File %s already exists.' % outfile)
357 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000358
359
maruel12e30012015-10-09 11:55:35 -0700360def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000361 """Determines if the given files appears valid.
362
363 Currently it just checks the file's size.
364 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700365 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700366 return fs.isfile(path)
367 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000368 if size != actual_size:
369 logging.warning(
370 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700371 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000372 return False
373 return True
374
375
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000376class Item(object):
377 """An item to push to Storage.
378
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800379 Its digest and size may be provided in advance, if known. Otherwise they will
380 be derived from content(). If digest is provided, it MUST correspond to
381 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000382
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800383 When used with Storage, Item starts its life in a main thread, travels
384 to 'contains' thread, then to 'push' thread and then finally back to
385 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000386 """
387
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800388 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000389 self.digest = digest
390 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800391 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000392 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000393
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800394 def content(self):
395 """Iterable with content of this item as byte string (str) chunks."""
396 raise NotImplementedError()
397
398 def prepare(self, hash_algo):
399 """Ensures self.digest and self.size are set.
400
401 Uses content() as a source of data to calculate them. Does nothing if digest
402 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000403
404 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800405 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000406 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800407 if self.digest is None or self.size is None:
408 digest = hash_algo()
409 total = 0
410 for chunk in self.content():
411 digest.update(chunk)
412 total += len(chunk)
413 self.digest = digest.hexdigest()
414 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000415
416
417class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800418 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000419
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800420 Its digest and size may be provided in advance, if known. Otherwise they will
421 be derived from the file content.
422 """
423
424 def __init__(self, path, digest=None, size=None, high_priority=False):
425 super(FileItem, self).__init__(
426 digest,
maruel12e30012015-10-09 11:55:35 -0700427 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800428 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000429 self.path = path
430 self.compression_level = get_zip_compression_level(path)
431
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800432 def content(self):
433 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000434
435
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000436class BufferItem(Item):
437 """A byte buffer to push to Storage."""
438
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800439 def __init__(self, buf, high_priority=False):
440 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000441 self.buffer = buf
442
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800443 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000444 return [self.buffer]
445
446
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000447class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800448 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000449
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800450 Implements compression support, parallel 'contains' checks, parallel uploads
451 and more.
452
453 Works only within single namespace (and thus hashing algorithm and compression
454 scheme are fixed).
455
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400456 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
457 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800458 """
459
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700460 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400462 self._use_zip = isolated_format.is_namespace_with_compression(
463 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400464 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 self._cpu_thread_pool = None
466 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400467 self._aborted = False
468 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000469
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000470 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700471 def hash_algo(self):
472 """Hashing algorithm used to name files in storage based on their content.
473
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400474 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700475 """
476 return self._hash_algo
477
478 @property
479 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500480 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700481 return self._storage_api.location
482
483 @property
484 def namespace(self):
485 """Isolate namespace used by this storage.
486
487 Indirectly defines hashing scheme and compression method used.
488 """
489 return self._storage_api.namespace
490
491 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000492 def cpu_thread_pool(self):
493 """ThreadPool for CPU-bound tasks like zipping."""
494 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500495 threads = max(threading_utils.num_processors(), 2)
496 if sys.maxsize <= 2L**32:
497 # On 32 bits userland, do not try to use more than 16 threads.
498 threads = min(threads, 16)
499 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000500 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000501
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000502 @property
503 def net_thread_pool(self):
504 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
505 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700506 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000507 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000508
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000509 def close(self):
510 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400511 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000512 if self._cpu_thread_pool:
513 self._cpu_thread_pool.join()
514 self._cpu_thread_pool.close()
515 self._cpu_thread_pool = None
516 if self._net_thread_pool:
517 self._net_thread_pool.join()
518 self._net_thread_pool.close()
519 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400520 logging.info('Done.')
521
522 def abort(self):
523 """Cancels any pending or future operations."""
524 # This is not strictly theadsafe, but in the worst case the logging message
525 # will be printed twice. Not a big deal. In other places it is assumed that
526 # unprotected reads and writes to _aborted are serializable (it is true
527 # for python) and thus no locking is used.
528 if not self._aborted:
529 logging.warning('Aborting... It can take a while.')
530 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000531
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532 def __enter__(self):
533 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400534 assert not self._prev_sig_handlers, self._prev_sig_handlers
535 for s in (signal.SIGINT, signal.SIGTERM):
536 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 return self
538
539 def __exit__(self, _exc_type, _exc_value, _traceback):
540 """Context manager interface."""
541 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400542 while self._prev_sig_handlers:
543 s, h = self._prev_sig_handlers.popitem()
544 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000545 return False
546
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000547 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800548 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000549
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800550 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000551
552 Arguments:
553 items: list of Item instances that represents data to upload.
554
555 Returns:
556 List of items that were uploaded. All other items are already there.
557 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700558 logging.info('upload_items(items=%d)', len(items))
559
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800560 # Ensure all digests are calculated.
561 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700562 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800563
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000564 # For each digest keep only first Item that matches it. All other items
565 # are just indistinguishable copies from the point of view of isolate
566 # server (it doesn't care about paths at all, only content and digests).
567 seen = {}
568 duplicates = 0
569 for item in items:
570 if seen.setdefault(item.digest, item) is not item:
571 duplicates += 1
572 items = seen.values()
573 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700574 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000575
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000576 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000577 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000578 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800579 channel = threading_utils.TaskChannel()
580 for missing_item, push_state in self.get_missing_items(items):
581 missing.add(missing_item)
582 self.async_push(channel, missing_item, push_state)
583
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000584 # No need to spawn deadlock detector thread if there's nothing to upload.
585 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700586 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000587 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000588 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000589 detector.ping()
590 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000591 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000592 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000593 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000594 logging.info('All files are uploaded')
595
596 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000597 total = len(items)
598 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 logging.info(
600 'Total: %6d, %9.1fkb',
601 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000602 total_size / 1024.)
603 cache_hit = set(items) - missing
604 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 logging.info(
606 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
607 len(cache_hit),
608 cache_hit_size / 1024.,
609 len(cache_hit) * 100. / total,
610 cache_hit_size * 100. / total_size if total_size else 0)
611 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000612 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000613 logging.info(
614 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
615 len(cache_miss),
616 cache_miss_size / 1024.,
617 len(cache_miss) * 100. / total,
618 cache_miss_size * 100. / total_size if total_size else 0)
619
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000620 return uploaded
621
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800622 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000623 """Starts asynchronous push to the server in a parallel thread.
624
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800625 Can be used only after |item| was checked for presence on a server with
626 'get_missing_items' call. 'get_missing_items' returns |push_state| object
627 that contains storage specific information describing how to upload
628 the item (for example in case of cloud storage, it is signed upload URLs).
629
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000630 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000631 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000632 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800633 push_state: push state returned by 'get_missing_items' call for |item|.
634
635 Returns:
636 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000637 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800638 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400639 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700640 threading_utils.PRIORITY_HIGH if item.high_priority
641 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000643 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400644 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400645 if self._aborted:
646 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700647 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800648 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000649 return item
650
651 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700652 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800653 self.net_thread_pool.add_task_with_channel(
654 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000655 return
656
657 # If zipping is enabled, zip in a separate thread.
658 def zip_and_push():
659 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
660 # content right here. It will block until all file is zipped.
661 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400662 if self._aborted:
663 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800664 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000665 data = ''.join(stream)
666 except Exception as exc:
667 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800668 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000669 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000670 self.net_thread_pool.add_task_with_channel(
671 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000672 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000673
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800674 def push(self, item, push_state):
675 """Synchronously pushes a single item to the server.
676
677 If you need to push many items at once, consider using 'upload_items' or
678 'async_push' with instance of TaskChannel.
679
680 Arguments:
681 item: item to upload as instance of Item class.
682 push_state: push state returned by 'get_missing_items' call for |item|.
683
684 Returns:
685 Pushed item (same object as |item|).
686 """
687 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700688 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800689 self.async_push(channel, item, push_state)
690 pushed = channel.pull()
691 assert pushed is item
692 return item
693
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000694 def async_fetch(self, channel, priority, digest, size, sink):
695 """Starts asynchronous fetch from the server in a parallel thread.
696
697 Arguments:
698 channel: TaskChannel that receives back |digest| when download ends.
699 priority: thread pool task priority for the fetch.
700 digest: hex digest of an item to download.
701 size: expected size of the item (after decompression).
702 sink: function that will be called as sink(generator).
703 """
704 def fetch():
705 try:
706 # Prepare reading pipeline.
707 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700708 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400709 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000710 # Run |stream| through verifier that will assert its size.
711 verifier = FetchStreamVerifier(stream, size)
712 # Verified stream goes to |sink|.
713 sink(verifier.run())
714 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800715 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000716 raise
717 return digest
718
719 # Don't bother with zip_thread_pool for decompression. Decompression is
720 # really fast and most probably IO bound anyway.
721 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
722
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000723 def get_missing_items(self, items):
724 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000725
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000726 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000727
728 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000729 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000730
731 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800732 For each missing item it yields a pair (item, push_state), where:
733 * item - Item object that is missing (one of |items|).
734 * push_state - opaque object that contains storage specific information
735 describing how to upload the item (for example in case of cloud
736 storage, it is signed upload URLs). It can later be passed to
737 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000738 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000739 channel = threading_utils.TaskChannel()
740 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800741
742 # Ensure all digests are calculated.
743 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700744 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800745
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400746 def contains(batch):
747 if self._aborted:
748 raise Aborted()
749 return self._storage_api.contains(batch)
750
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000751 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800752 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400753 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400754 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000755 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800756
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000757 # Yield results as they come in.
758 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800759 for missing_item, push_state in channel.pull().iteritems():
760 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000761
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000762
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800763def batch_items_for_check(items):
764 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000765
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800766 Each batch corresponds to a single 'exists?' query to the server via a call
767 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000768
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800769 Arguments:
770 items: a list of Item objects.
771
772 Yields:
773 Batches of items to query for existence in a single operation,
774 each batch is a list of Item objects.
775 """
776 batch_count = 0
777 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
778 next_queries = []
779 for item in sorted(items, key=lambda x: x.size, reverse=True):
780 next_queries.append(item)
781 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000782 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800783 next_queries = []
784 batch_count += 1
785 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
786 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
787 if next_queries:
788 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000789
790
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000791class FetchQueue(object):
792 """Fetches items from Storage and places them into LocalCache.
793
794 It manages multiple concurrent fetch operations. Acts as a bridge between
795 Storage and LocalCache so that Storage and LocalCache don't depend on each
796 other at all.
797 """
798
799 def __init__(self, storage, cache):
800 self.storage = storage
801 self.cache = cache
802 self._channel = threading_utils.TaskChannel()
803 self._pending = set()
804 self._accessed = set()
805 self._fetched = cache.cached_set()
806
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400807 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700808 self,
809 digest,
810 size=UNKNOWN_FILE_SIZE,
811 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000812 """Starts asynchronous fetch of item |digest|."""
813 # Fetching it now?
814 if digest in self._pending:
815 return
816
817 # Mark this file as in use, verify_all_cached will later ensure it is still
818 # in cache.
819 self._accessed.add(digest)
820
821 # Already fetched? Notify cache to update item's LRU position.
822 if digest in self._fetched:
823 # 'touch' returns True if item is in cache and not corrupted.
824 if self.cache.touch(digest, size):
825 return
826 # Item is corrupted, remove it from cache and fetch it again.
827 self._fetched.remove(digest)
828 self.cache.evict(digest)
829
830 # TODO(maruel): It should look at the free disk space, the current cache
831 # size and the size of the new item on every new item:
832 # - Trim the cache as more entries are listed when free disk space is low,
833 # otherwise if the amount of data downloaded during the run > free disk
834 # space, it'll crash.
835 # - Make sure there's enough free disk space to fit all dependencies of
836 # this run! If not, abort early.
837
838 # Start fetching.
839 self._pending.add(digest)
840 self.storage.async_fetch(
841 self._channel, priority, digest, size,
842 functools.partial(self.cache.write, digest))
843
844 def wait(self, digests):
845 """Starts a loop that waits for at least one of |digests| to be retrieved.
846
847 Returns the first digest retrieved.
848 """
849 # Flush any already fetched items.
850 for digest in digests:
851 if digest in self._fetched:
852 return digest
853
854 # Ensure all requested items are being fetched now.
855 assert all(digest in self._pending for digest in digests), (
856 digests, self._pending)
857
858 # Wait for some requested item to finish fetching.
859 while self._pending:
860 digest = self._channel.pull()
861 self._pending.remove(digest)
862 self._fetched.add(digest)
863 if digest in digests:
864 return digest
865
866 # Should never reach this point due to assert above.
867 raise RuntimeError('Impossible state')
868
869 def inject_local_file(self, path, algo):
870 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700871 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000872 data = f.read()
873 digest = algo(data).hexdigest()
874 self.cache.write(digest, [data])
875 self._fetched.add(digest)
876 return digest
877
878 @property
879 def pending_count(self):
880 """Returns number of items to be fetched."""
881 return len(self._pending)
882
883 def verify_all_cached(self):
884 """True if all accessed items are in cache."""
885 return self._accessed.issubset(self.cache.cached_set())
886
887
888class FetchStreamVerifier(object):
889 """Verifies that fetched file is valid before passing it to the LocalCache."""
890
891 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400892 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000893 self.stream = stream
894 self.expected_size = expected_size
895 self.current_size = 0
896
897 def run(self):
898 """Generator that yields same items as |stream|.
899
900 Verifies |stream| is complete before yielding a last chunk to consumer.
901
902 Also wraps IOError produced by consumer into MappingError exceptions since
903 otherwise Storage will retry fetch on unrelated local cache errors.
904 """
905 # Read one chunk ahead, keep it in |stored|.
906 # That way a complete stream can be verified before pushing last chunk
907 # to consumer.
908 stored = None
909 for chunk in self.stream:
910 assert chunk is not None
911 if stored is not None:
912 self._inspect_chunk(stored, is_last=False)
913 try:
914 yield stored
915 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400916 raise isolated_format.MappingError(
917 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000918 stored = chunk
919 if stored is not None:
920 self._inspect_chunk(stored, is_last=True)
921 try:
922 yield stored
923 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400924 raise isolated_format.MappingError(
925 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000926
927 def _inspect_chunk(self, chunk, is_last):
928 """Called for each fetched chunk before passing it to consumer."""
929 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400930 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700931 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000932 (self.expected_size != self.current_size)):
933 raise IOError('Incorrect file size: expected %d, got %d' % (
934 self.expected_size, self.current_size))
935
936
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000937class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800938 """Interface for classes that implement low-level storage operations.
939
940 StorageApi is oblivious of compression and hashing scheme used. This details
941 are handled in higher level Storage class.
942
943 Clients should generally not use StorageApi directly. Storage class is
944 preferred since it implements compression and upload optimizations.
945 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000946
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700947 @property
948 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500949 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700950 raise NotImplementedError()
951
952 @property
953 def namespace(self):
954 """Isolate namespace used by this storage.
955
956 Indirectly defines hashing scheme and compression method used.
957 """
958 raise NotImplementedError()
959
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800960 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000961 """Fetches an object and yields its content.
962
963 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000964 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800965 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000966
967 Yields:
968 Chunks of downloaded item (as str objects).
969 """
970 raise NotImplementedError()
971
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800972 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000973 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000974
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800975 |item| MUST go through 'contains' call to get |push_state| before it can
976 be pushed to the storage.
977
978 To be clear, here is one possible usage:
979 all_items = [... all items to push as Item subclasses ...]
980 for missing_item, push_state in storage_api.contains(all_items).items():
981 storage_api.push(missing_item, push_state)
982
983 When pushing to a namespace with compression, data that should be pushed
984 and data provided by the item is not the same. In that case |content| is
985 not None and it yields chunks of compressed data (using item.content() as
986 a source of original uncompressed data). This is implemented by Storage
987 class.
988
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000989 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000990 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800991 push_state: push state object as returned by 'contains' call.
992 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000993
994 Returns:
995 None.
996 """
997 raise NotImplementedError()
998
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000999 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001000 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001001
1002 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001003 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001004
1005 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001006 A dict missing Item -> opaque push state object to be passed to 'push'.
1007 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001008 """
1009 raise NotImplementedError()
1010
1011
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001012class _IsolateServerPushState(object):
1013 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -05001014
1015 Note this needs to be a global class to support pickling.
1016 """
1017
Cory Massarocc19c8c2015-03-10 13:35:11 -07001018 def __init__(self, preupload_status, size):
1019 self.preupload_status = preupload_status
1020 gs_upload_url = preupload_status.get('gs_upload_url') or None
1021 if gs_upload_url:
1022 self.upload_url = gs_upload_url
maruel380e3262016-08-31 16:10:06 -07001023 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
Cory Massarocc19c8c2015-03-10 13:35:11 -07001024 else:
maruel380e3262016-08-31 16:10:06 -07001025 self.upload_url = 'api/isolateservice/v1/store_inline'
Cory Massarocc19c8c2015-03-10 13:35:11 -07001026 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -05001027 self.uploaded = False
1028 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001029 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -05001030
1031
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001032class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001033 """StorageApi implementation that downloads and uploads to Isolate Server.
1034
1035 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001036 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001037 """
1038
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001039 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001040 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001041 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001042 self._base_url = base_url.rstrip('/')
1043 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -07001044 self._namespace_dict = {
1045 'compression': 'flate' if namespace.endswith(
1046 ('-gzip', '-flate')) else '',
1047 'digest_hash': 'sha-1',
1048 'namespace': namespace,
1049 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001050 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001051 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001052 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001053
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001054 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001055 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001056 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001057
1058 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001059 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001060 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001061 # TODO(maruel): Make this request much earlier asynchronously while the
1062 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001063
1064 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
1065 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001066
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001067 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001068 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001069 self._server_caps = net.url_read_json(
maruel380e3262016-08-31 16:10:06 -07001070 url='%s/api/isolateservice/v1/server_details' % self._base_url,
Cory Massarocc19c8c2015-03-10 13:35:11 -07001071 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001072 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001073
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001074 @property
1075 def location(self):
1076 return self._base_url
1077
1078 @property
1079 def namespace(self):
1080 return self._namespace
1081
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001082 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001083 assert offset >= 0
maruel380e3262016-08-31 16:10:06 -07001084 source_url = '%s/api/isolateservice/v1/retrieve' % (
Cory Massarocc19c8c2015-03-10 13:35:11 -07001085 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001086 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001087 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001088
Cory Massarocc19c8c2015-03-10 13:35:11 -07001089 if not response:
maruele154f9c2015-09-14 11:03:15 -07001090 raise IOError(
1091 'Attempted to fetch from %s; no data exist: %s / %s.' % (
1092 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001093
Cory Massarocc19c8c2015-03-10 13:35:11 -07001094 # for DB uploads
1095 content = response.get('content')
1096 if content is not None:
maruel863ac262016-03-17 11:00:37 -07001097 yield base64.b64decode(content)
1098 return
Cory Massarocc19c8c2015-03-10 13:35:11 -07001099
1100 # for GS entities
1101 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -07001102 if not connection:
1103 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -07001104
1105 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001106 if offset:
1107 content_range = connection.get_header('Content-Range')
1108 if not content_range:
1109 raise IOError('Missing Content-Range header')
1110
1111 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1112 # According to a spec, <size> can be '*' meaning "Total size of the file
1113 # is not known in advance".
1114 try:
1115 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1116 if not match:
1117 raise ValueError()
1118 content_offset = int(match.group(1))
1119 last_byte_index = int(match.group(2))
1120 size = None if match.group(3) == '*' else int(match.group(3))
1121 except ValueError:
1122 raise IOError('Invalid Content-Range header: %s' % content_range)
1123
1124 # Ensure returned offset equals requested one.
1125 if offset != content_offset:
1126 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1127 offset, content_offset, content_range))
1128
1129 # Ensure entire tail of the file is returned.
1130 if size is not None and last_byte_index + 1 != size:
1131 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1132
maruel863ac262016-03-17 11:00:37 -07001133 for data in connection.iter_content(NET_IO_FILE_CHUNK):
1134 yield data
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001135
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001136 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001137 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001138 assert item.digest is not None
1139 assert item.size is not None
1140 assert isinstance(push_state, _IsolateServerPushState)
1141 assert not push_state.finalized
1142
1143 # Default to item.content().
1144 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001145 logging.info('Push state size: %d', push_state.size)
1146 if isinstance(content, (basestring, list)):
1147 # Memory is already used, too late.
1148 with self._lock:
1149 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001150 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001151 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1152 # If |content| is indeed a generator, it can not be re-winded back to the
1153 # beginning of the stream. A retry will find it exhausted. A possible
1154 # solution is to wrap |content| generator with some sort of caching
1155 # restartable generator. It should be done alongside streaming support
1156 # implementation.
1157 #
1158 # In theory, we should keep the generator, so that it is not serialized in
1159 # memory. Sadly net.HttpService.request() requires the body to be
1160 # serialized.
1161 assert isinstance(content, types.GeneratorType), repr(content)
1162 slept = False
1163 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001164 # One byte less than 512mb. This is to cope with incompressible content.
1165 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001166 while True:
1167 with self._lock:
1168 # This is due to 32 bits python when uploading very large files. The
1169 # problem is that it's comparing uncompressed sizes, while we care
1170 # about compressed sizes since it's what is serialized in memory.
1171 # The first check assumes large files are compressible and that by
1172 # throttling one upload at once, we can survive. Otherwise, kaboom.
1173 memory_use = self._memory_use
1174 if ((push_state.size >= max_size and not memory_use) or
1175 (memory_use + push_state.size <= max_size)):
1176 self._memory_use += push_state.size
1177 memory_use = self._memory_use
1178 break
1179 time.sleep(0.1)
1180 slept = True
1181 if slept:
1182 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001183
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001184 try:
1185 # This push operation may be a retry after failed finalization call below,
1186 # no need to reupload contents in that case.
1187 if not push_state.uploaded:
1188 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001189 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001190 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001191 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001192 item.digest, push_state.upload_url))
1193 push_state.uploaded = True
1194 else:
1195 logging.info(
1196 'A file %s already uploaded, retrying finalization only',
1197 item.digest)
1198
1199 # Optionally notify the server that it's done.
1200 if push_state.finalize_url:
1201 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1202 # send it to isolated server. That way isolate server can verify that
1203 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1204 # stored files).
1205 # TODO(maruel): Fix the server to accept properly data={} so
1206 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001207 response = net.url_read_json(
1208 url='%s/%s' % (self._base_url, push_state.finalize_url),
1209 data={
1210 'upload_ticket': push_state.preupload_status['upload_ticket'],
1211 })
1212 if not response or not response['ok']:
1213 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001214 push_state.finalized = True
1215 finally:
1216 with self._lock:
1217 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001218
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001219 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001220 # Ensure all items were initialized with 'prepare' call. Storage does that.
1221 assert all(i.digest is not None and i.size is not None for i in items)
1222
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001223 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001224 body = {
1225 'items': [
1226 {
1227 'digest': item.digest,
1228 'is_isolated': bool(item.high_priority),
1229 'size': item.size,
1230 } for item in items
1231 ],
1232 'namespace': self._namespace_dict,
1233 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001234
maruel380e3262016-08-31 16:10:06 -07001235 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001236
1237 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001238 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001239 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001240 response = net.url_read_json(url=query_url, data=body)
1241 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001242 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001243 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001244 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001245 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001246 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001247
1248 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001249 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001250 for preupload_status in response.get('items', []):
1251 assert 'upload_ticket' in preupload_status, (
1252 preupload_status, '/preupload did not generate an upload ticket')
1253 index = int(preupload_status['index'])
1254 missing_items[items[index]] = _IsolateServerPushState(
1255 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001256 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001257 len(items), len(items) - len(missing_items))
1258 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001259
Cory Massarocc19c8c2015-03-10 13:35:11 -07001260 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001261 """Fetches isolated data from the URL.
1262
1263 Used only for fetching files, not for API calls. Can be overridden in
1264 subclasses.
1265
1266 Args:
1267 url: URL to fetch the data from, can possibly return http redirect.
1268 offset: byte offset inside the file to start fetching from.
1269
1270 Returns:
1271 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1272 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001273 assert isinstance(offset, int)
1274 data = {
1275 'digest': digest.encode('utf-8'),
1276 'namespace': self._namespace_dict,
1277 'offset': offset,
1278 }
maruel0c25f4f2015-12-15 05:41:17 -08001279 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1280 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001281 return net.url_read_json(
1282 url=url,
1283 data=data,
1284 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001285
Cory Massarocc19c8c2015-03-10 13:35:11 -07001286 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001287 """Uploads isolated file to the URL.
1288
1289 Used only for storing files, not for API calls. Can be overridden in
1290 subclasses.
1291
1292 Args:
1293 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001294 push_state: an _IsolateServicePushState instance
1295 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001296 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001297 """
1298 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1299 # upload support is implemented.
1300 if isinstance(content, list) and len(content) == 1:
1301 content = content[0]
1302 else:
1303 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001304
1305 # DB upload
1306 if not push_state.finalize_url:
1307 url = '%s/%s' % (self._base_url, push_state.upload_url)
1308 content = base64.b64encode(content)
1309 data = {
1310 'upload_ticket': push_state.preupload_status['upload_ticket'],
1311 'content': content,
1312 }
1313 response = net.url_read_json(url=url, data=data)
1314 return response is not None and response['ok']
1315
1316 # upload to GS
1317 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001318 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001319 content_type='application/octet-stream',
1320 data=content,
1321 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001322 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001323 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001324 return response is not None
1325
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001326
nodir445097b2016-06-03 22:50:26 -07001327class CacheMiss(Exception):
1328 """Raised when an item is not in cache."""
1329
1330 def __init__(self, digest):
1331 self.digest = digest
1332 super(CacheMiss, self).__init__(
1333 'Item with digest %r is not found in cache' % digest)
1334
1335
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001336class LocalCache(object):
1337 """Local cache that stores objects fetched via Storage.
1338
1339 It can be accessed concurrently from multiple threads, so it should protect
1340 its internal state with some lock.
1341 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001342 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001343
maruel064c0a32016-04-05 11:47:15 -07001344 def __init__(self):
1345 self._lock = threading_utils.LockWithAssert()
1346 # Profiling values.
1347 self._added = []
1348 self._initial_number_items = 0
1349 self._initial_size = 0
1350 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -07001351 self._used = []
maruel064c0a32016-04-05 11:47:15 -07001352
nodirbe642ff2016-06-09 15:51:51 -07001353 def __contains__(self, digest):
1354 raise NotImplementedError()
1355
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001356 def __enter__(self):
1357 """Context manager interface."""
1358 return self
1359
1360 def __exit__(self, _exc_type, _exec_value, _traceback):
1361 """Context manager interface."""
1362 return False
1363
maruel064c0a32016-04-05 11:47:15 -07001364 @property
1365 def added(self):
1366 return self._added[:]
1367
1368 @property
1369 def evicted(self):
1370 return self._evicted[:]
1371
1372 @property
tansell9e04a8d2016-07-28 09:31:59 -07001373 def used(self):
1374 return self._used[:]
1375
1376 @property
maruel064c0a32016-04-05 11:47:15 -07001377 def initial_number_items(self):
1378 return self._initial_number_items
1379
1380 @property
1381 def initial_size(self):
1382 return self._initial_size
1383
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001384 def cached_set(self):
1385 """Returns a set of all cached digests (always a new object)."""
1386 raise NotImplementedError()
1387
maruel36a963d2016-04-08 17:15:49 -07001388 def cleanup(self):
1389 """Deletes any corrupted item from the cache and trims it if necessary."""
1390 raise NotImplementedError()
1391
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001392 def touch(self, digest, size):
1393 """Ensures item is not corrupted and updates its LRU position.
1394
1395 Arguments:
1396 digest: hash digest of item to check.
1397 size: expected size of this item.
1398
1399 Returns:
1400 True if item is in cache and not corrupted.
1401 """
1402 raise NotImplementedError()
1403
1404 def evict(self, digest):
1405 """Removes item from cache if it's there."""
1406 raise NotImplementedError()
1407
tansell9e04a8d2016-07-28 09:31:59 -07001408 def getfileobj(self, digest):
1409 """Returns a readable file like object.
1410
1411 If file exists on the file system it will have a .name attribute with an
1412 absolute path to the file.
1413 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001414 raise NotImplementedError()
1415
1416 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001417 """Reads data from |content| generator and stores it in cache.
1418
1419 Returns digest to simplify chaining.
1420 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001421 raise NotImplementedError()
1422
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001423
1424class MemoryCache(LocalCache):
1425 """LocalCache implementation that stores everything in memory."""
1426
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001427 def __init__(self, file_mode_mask=0500):
1428 """Args:
1429 file_mode_mask: bit mask to AND file mode with. Default value will make
1430 all mapped files to be read only.
1431 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001432 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001433 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001434 self._contents = {}
1435
nodirbe642ff2016-06-09 15:51:51 -07001436 def __contains__(self, digest):
1437 with self._lock:
1438 return digest in self._contents
1439
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001440 def cached_set(self):
1441 with self._lock:
1442 return set(self._contents)
1443
maruel36a963d2016-04-08 17:15:49 -07001444 def cleanup(self):
1445 pass
1446
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001447 def touch(self, digest, size):
1448 with self._lock:
1449 return digest in self._contents
1450
1451 def evict(self, digest):
1452 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001453 v = self._contents.pop(digest, None)
1454 if v is not None:
1455 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001456
tansell9e04a8d2016-07-28 09:31:59 -07001457 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001458 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001459 try:
tansell9e04a8d2016-07-28 09:31:59 -07001460 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001461 except KeyError:
1462 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001463 self._used.append(len(d))
1464 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001465
1466 def write(self, digest, content):
1467 # Assemble whole stream before taking the lock.
1468 data = ''.join(content)
1469 with self._lock:
1470 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001471 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001472 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001473
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001474
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001475class CachePolicies(object):
1476 def __init__(self, max_cache_size, min_free_space, max_items):
1477 """
1478 Arguments:
1479 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1480 cache is effectively a leak.
1481 - min_free_space: Trim if disk free space becomes lower than this value. If
1482 0, it unconditionally fill the disk.
1483 - max_items: Maximum number of items to keep in the cache. If 0, do not
1484 enforce a limit.
1485 """
1486 self.max_cache_size = max_cache_size
1487 self.min_free_space = min_free_space
1488 self.max_items = max_items
1489
1490
1491class DiskCache(LocalCache):
1492 """Stateful LRU cache in a flat hash table in a directory.
1493
1494 Saves its state as json file.
1495 """
maruel12e30012015-10-09 11:55:35 -07001496 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001497
nodirf33b8d62016-10-26 22:34:58 -07001498 def __init__(self, cache_dir, policies, hash_algo, trim=True):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001499 """
1500 Arguments:
1501 cache_dir: directory where to place the cache.
1502 policies: cache retention policies.
1503 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001504 trim: if True to enforce |policies| right away.
1505 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001506 """
maruel064c0a32016-04-05 11:47:15 -07001507 # All protected methods (starting with '_') except _path should be called
1508 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001509 super(DiskCache, self).__init__()
1510 self.cache_dir = cache_dir
1511 self.policies = policies
1512 self.hash_algo = hash_algo
1513 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001514 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001515 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001516 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001517 file_path.ensure_tree(self.cache_dir)
1518 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001519 # The first item in the LRU cache that must not be evicted during this run
1520 # since it was referenced. All items more recent that _protected in the LRU
1521 # cache are also inherently protected. It could be a set() of all items
1522 # referenced but this increases memory usage without a use case.
1523 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001524 # Cleanup operations done by self._load(), if any.
1525 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001526 with tools.Profiler('Setup'):
1527 with self._lock:
nodirf33b8d62016-10-26 22:34:58 -07001528 self._load(trim=trim)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001529
nodirbe642ff2016-06-09 15:51:51 -07001530 def __contains__(self, digest):
1531 with self._lock:
1532 return digest in self._lru
1533
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001534 def __enter__(self):
1535 return self
1536
1537 def __exit__(self, _exc_type, _exec_value, _traceback):
1538 with tools.Profiler('CleanupTrimming'):
1539 with self._lock:
1540 self._trim()
1541
1542 logging.info(
1543 '%5d (%8dkb) added',
1544 len(self._added), sum(self._added) / 1024)
1545 logging.info(
1546 '%5d (%8dkb) current',
1547 len(self._lru),
1548 sum(self._lru.itervalues()) / 1024)
1549 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001550 '%5d (%8dkb) evicted',
1551 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001552 logging.info(
1553 ' %8dkb free',
1554 self._free_disk / 1024)
1555 return False
1556
1557 def cached_set(self):
1558 with self._lock:
1559 return self._lru.keys_set()
1560
maruel36a963d2016-04-08 17:15:49 -07001561 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001562 """Cleans up the cache directory.
1563
1564 Ensures there is no unknown files in cache_dir.
1565 Ensures the read-only bits are set correctly.
1566
1567 At that point, the cache was already loaded, trimmed to respect cache
1568 policies.
1569 """
1570 fs.chmod(self.cache_dir, 0700)
1571 # Ensure that all files listed in the state still exist and add new ones.
1572 previous = self._lru.keys_set()
1573 # It'd be faster if there were a readdir() function.
1574 for filename in fs.listdir(self.cache_dir):
1575 if filename == self.STATE_FILE:
1576 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1577 continue
1578 if filename in previous:
1579 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1580 previous.remove(filename)
1581 continue
1582
1583 # An untracked file. Delete it.
1584 logging.warning('Removing unknown file %s from cache', filename)
1585 p = self._path(filename)
1586 if fs.isdir(p):
1587 try:
1588 file_path.rmtree(p)
1589 except OSError:
1590 pass
1591 else:
1592 file_path.try_remove(p)
1593 continue
1594
1595 if previous:
1596 # Filter out entries that were not found.
1597 logging.warning('Removed %d lost files', len(previous))
1598 for filename in previous:
1599 self._lru.pop(filename)
maruel36a963d2016-04-08 17:15:49 -07001600
1601 # What remains to be done is to hash every single item to
1602 # detect corruption, then save to ensure state.json is up to date.
1603 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1604 # TODO(maruel): Let's revisit once directory metadata is stored in
1605 # state.json so only the files that had been mapped since the last cleanup()
1606 # call are manually verified.
1607 #
1608 #with self._lock:
1609 # for digest in self._lru:
1610 # if not isolated_format.is_valid_hash(
1611 # self._path(digest), self.hash_algo):
1612 # self.evict(digest)
1613 # logging.info('Deleted corrupted item: %s', digest)
1614
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001615 def touch(self, digest, size):
1616 """Verifies an actual file is valid.
1617
1618 Note that is doesn't compute the hash so it could still be corrupted if the
1619 file size didn't change.
1620
1621 TODO(maruel): More stringent verification while keeping the check fast.
1622 """
1623 # Do the check outside the lock.
1624 if not is_valid_file(self._path(digest), size):
1625 return False
1626
1627 # Update it's LRU position.
1628 with self._lock:
1629 if digest not in self._lru:
1630 return False
1631 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001632 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001633 return True
1634
1635 def evict(self, digest):
1636 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001637 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001638 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001639 self._lru.pop(digest)
1640 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1641
tansell9e04a8d2016-07-28 09:31:59 -07001642 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001643 try:
tansell9e04a8d2016-07-28 09:31:59 -07001644 f = fs.open(self._path(digest), 'rb')
1645 with self._lock:
1646 self._used.append(self._lru[digest])
1647 return f
nodir445097b2016-06-03 22:50:26 -07001648 except IOError:
1649 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001650
1651 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001652 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001653 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001654 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001655 path = self._path(digest)
1656 # A stale broken file may remain. It is possible for the file to have write
1657 # access bit removed which would cause the file_write() call to fail to open
1658 # in write mode. Take no chance here.
1659 file_path.try_remove(path)
1660 try:
1661 size = file_write(path, content)
1662 except:
1663 # There are two possible places were an exception can occur:
1664 # 1) Inside |content| generator in case of network or unzipping errors.
1665 # 2) Inside file_write itself in case of disk IO errors.
1666 # In any case delete an incomplete file and propagate the exception to
1667 # caller, it will be logged there.
1668 file_path.try_remove(path)
1669 raise
1670 # Make the file read-only in the cache. This has a few side-effects since
1671 # the file node is modified, so every directory entries to this file becomes
1672 # read-only. It's fine here because it is a new file.
1673 file_path.set_read_only(path, True)
1674 with self._lock:
1675 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001676 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001677
nodirf33b8d62016-10-26 22:34:58 -07001678 def get_oldest(self):
1679 """Returns digest of the LRU item or None."""
1680 try:
1681 return self._lru.get_oldest()[0]
1682 except KeyError:
1683 return None
1684
1685 def get_timestamp(self, digest):
1686 """Returns timestamp of last use of an item.
1687
1688 Raises KeyError if item is not found.
1689 """
1690 return self._lru.get_timestamp(digest)
1691
1692 def trim(self):
1693 """Forces retention policies."""
1694 with self._lock:
1695 self._trim()
1696
1697 def _load(self, trim):
maruel2e8d0f52016-07-16 07:51:29 -07001698 """Loads state of the cache from json file.
1699
1700 If cache_dir does not exist on disk, it is created.
1701 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001702 self._lock.assert_locked()
1703
maruel2e8d0f52016-07-16 07:51:29 -07001704 if not fs.isfile(self.state_file):
1705 if not os.path.isdir(self.cache_dir):
1706 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001707 else:
maruel2e8d0f52016-07-16 07:51:29 -07001708 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001709 try:
1710 self._lru = lru.LRUDict.load(self.state_file)
1711 except ValueError as err:
1712 logging.error('Failed to load cache state: %s' % (err,))
1713 # Don't want to keep broken state file.
1714 file_path.try_remove(self.state_file)
nodirf33b8d62016-10-26 22:34:58 -07001715 if trim:
1716 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001717 # We want the initial cache size after trimming, i.e. what is readily
1718 # avaiable.
1719 self._initial_number_items = len(self._lru)
1720 self._initial_size = sum(self._lru.itervalues())
1721 if self._evicted:
1722 logging.info(
1723 'Trimming evicted items with the following sizes: %s',
1724 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001725
1726 def _save(self):
1727 """Saves the LRU ordering."""
1728 self._lock.assert_locked()
1729 if sys.platform != 'win32':
1730 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001731 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001732 # Necessary otherwise the file can't be created.
1733 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001734 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001735 file_path.set_read_only(self.state_file, False)
1736 self._lru.save(self.state_file)
1737
1738 def _trim(self):
1739 """Trims anything we don't know, make sure enough free space exists."""
1740 self._lock.assert_locked()
1741
1742 # Ensure maximum cache size.
1743 if self.policies.max_cache_size:
1744 total_size = sum(self._lru.itervalues())
1745 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001746 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001747
1748 # Ensure maximum number of items in the cache.
1749 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1750 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001751 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001752
1753 # Ensure enough free space.
1754 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001755 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001756 while (
1757 self.policies.min_free_space and
1758 self._lru and
1759 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001760 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001761 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001762
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001763 if trimmed_due_to_space:
1764 total_usage = sum(self._lru.itervalues())
1765 usage_percent = 0.
1766 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001767 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1768
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001769 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001770 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1771 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1772 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001773 self._free_disk / 1024.,
1774 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001775 usage_percent,
1776 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001777 self._save()
1778
1779 def _path(self, digest):
1780 """Returns the path to one item."""
1781 return os.path.join(self.cache_dir, digest)
1782
maruel2e8d0f52016-07-16 07:51:29 -07001783 def _remove_lru_file(self, allow_protected):
1784 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001785 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001786 try:
nodireabc11c2016-10-18 16:37:28 -07001787 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001788 if not allow_protected and digest == self._protected:
1789 raise Error('Not enough space to map the whole isolated tree')
maruel083fa552016-04-08 14:38:01 -07001790 except KeyError:
1791 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001792 digest, (size, _) = self._lru.pop_oldest()
kjlubickea9abf02016-06-01 09:34:33 -07001793 logging.debug("Removing LRU file %s", digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001794 self._delete_file(digest, size)
1795 return size
1796
1797 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1798 """Adds an item into LRU cache marking it as a newest one."""
1799 self._lock.assert_locked()
1800 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001801 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001802 self._added.append(size)
1803 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001804 self._free_disk -= size
1805 # Do a quicker version of self._trim(). It only enforces free disk space,
1806 # not cache size limits. It doesn't actually look at real free disk space,
1807 # only uses its cache values. self._trim() will be called later to enforce
1808 # real trimming but doing this quick version here makes it possible to map
1809 # an isolated that is larger than the current amount of free disk space when
1810 # the cache size is already large.
1811 while (
1812 self.policies.min_free_space and
1813 self._lru and
1814 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001815 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001816
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001817 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1818 """Deletes cache file from the file system."""
1819 self._lock.assert_locked()
1820 try:
1821 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001822 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001823 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001824 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001825 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001826 except OSError as e:
1827 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1828
1829
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001830class IsolatedBundle(object):
1831 """Fetched and parsed .isolated file with all dependencies."""
1832
Vadim Shtayura3148e072014-09-02 18:51:52 -07001833 def __init__(self):
1834 self.command = []
1835 self.files = {}
1836 self.read_only = None
1837 self.relative_cwd = None
1838 # The main .isolated file, a IsolatedFile instance.
1839 self.root = None
1840
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001841 def fetch(self, fetch_queue, root_isolated_hash, algo):
1842 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001843
1844 It enables support for "included" .isolated files. They are processed in
1845 strict order but fetched asynchronously from the cache. This is important so
1846 that a file in an included .isolated file that is overridden by an embedding
1847 .isolated file is not fetched needlessly. The includes are fetched in one
1848 pass and the files are fetched as soon as all the ones on the left-side
1849 of the tree were fetched.
1850
1851 The prioritization is very important here for nested .isolated files.
1852 'includes' have the highest priority and the algorithm is optimized for both
1853 deep and wide trees. A deep one is a long link of .isolated files referenced
1854 one at a time by one item in 'includes'. A wide one has a large number of
1855 'includes' in a single .isolated file. 'left' is defined as an included
1856 .isolated file earlier in the 'includes' list. So the order of the elements
1857 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001858
1859 As a side effect this method starts asynchronous fetch of all data files
1860 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1861 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001862 """
1863 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1864
1865 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1866 pending = {}
1867 # Set of hashes of already retrieved items to refuse recursive includes.
1868 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001869 # Set of IsolatedFile's whose data files have already being fetched.
1870 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001871
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001872 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001873 h = isolated_file.obj_hash
1874 if h in seen:
1875 raise isolated_format.IsolatedError(
1876 'IsolatedFile %s is retrieved recursively' % h)
1877 assert h not in pending
1878 seen.add(h)
1879 pending[h] = isolated_file
1880 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1881
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001882 # Start fetching root *.isolated file (single file, not the whole bundle).
1883 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001884
1885 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001886 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001887 item_hash = fetch_queue.wait(pending)
1888 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001889 with fetch_queue.cache.getfileobj(item_hash) as f:
1890 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001891
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001892 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001893 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001894 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001895
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001896 # Always fetch *.isolated files in traversal order, waiting if necessary
1897 # until next to-be-processed node loads. "Waiting" is done by yielding
1898 # back to the outer loop, that waits until some *.isolated is loaded.
1899 for node in isolated_format.walk_includes(self.root):
1900 if node not in processed:
1901 # Not visited, and not yet loaded -> wait for it to load.
1902 if not node.is_loaded:
1903 break
1904 # Not visited and loaded -> process it and continue the traversal.
1905 self._start_fetching_files(node, fetch_queue)
1906 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001907
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001908 # All *.isolated files should be processed by now and only them.
1909 all_isolateds = set(isolated_format.walk_includes(self.root))
1910 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001911
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001912 # Extract 'command' and other bundle properties.
1913 for node in isolated_format.walk_includes(self.root):
1914 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001915 self.relative_cwd = self.relative_cwd or ''
1916
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001917 def _start_fetching_files(self, isolated, fetch_queue):
1918 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001919
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001920 Modifies self.files.
1921 """
1922 logging.debug('fetch_files(%s)', isolated.obj_hash)
1923 for filepath, properties in isolated.data.get('files', {}).iteritems():
1924 # Root isolated has priority on the files being mapped. In particular,
1925 # overridden files must not be fetched.
1926 if filepath not in self.files:
1927 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001928
1929 # Make sure if the isolated is read only, the mode doesn't have write
1930 # bits.
1931 if 'm' in properties and self.read_only:
1932 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1933
1934 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001935 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001936 logging.debug('fetching %s', filepath)
1937 fetch_queue.add(
1938 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1939
1940 def _update_self(self, node):
1941 """Extracts bundle global parameters from loaded *.isolated file.
1942
1943 Will be called with each loaded *.isolated file in order of traversal of
1944 isolated include graph (see isolated_format.walk_includes).
1945 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001946 # Grabs properties.
1947 if not self.command and node.data.get('command'):
1948 # Ensure paths are correctly separated on windows.
1949 self.command = node.data['command']
1950 if self.command:
1951 self.command[0] = self.command[0].replace('/', os.path.sep)
1952 self.command = tools.fix_python_path(self.command)
1953 if self.read_only is None and node.data.get('read_only') is not None:
1954 self.read_only = node.data['read_only']
1955 if (self.relative_cwd is None and
1956 node.data.get('relative_cwd') is not None):
1957 self.relative_cwd = node.data['relative_cwd']
1958
1959
Vadim Shtayura8623c272014-12-01 11:45:27 -08001960def set_storage_api_class(cls):
1961 """Replaces StorageApi implementation used by default."""
1962 global _storage_api_cls
1963 assert _storage_api_cls is None
1964 assert issubclass(cls, StorageApi)
1965 _storage_api_cls = cls
1966
1967
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001968def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001969 """Returns an object that implements low-level StorageApi interface.
1970
1971 It is used by Storage to work with single isolate |namespace|. It should
1972 rarely be used directly by clients, see 'get_storage' for
1973 a better alternative.
1974
1975 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001976 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001977 namespace: isolate namespace to operate in, also defines hashing and
1978 compression scheme used, i.e. namespace names that end with '-gzip'
1979 store compressed data.
1980
1981 Returns:
1982 Instance of StorageApi subclass.
1983 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001984 cls = _storage_api_cls or IsolateServer
1985 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001986
1987
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001988def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001989 """Returns Storage class that can upload and download from |namespace|.
1990
1991 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001992 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001993 namespace: isolate namespace to operate in, also defines hashing and
1994 compression scheme used, i.e. namespace names that end with '-gzip'
1995 store compressed data.
1996
1997 Returns:
1998 Instance of Storage.
1999 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05002000 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00002001
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00002002
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002003def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002004 """Uploads the given tree to the given url.
2005
2006 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002007 base_url: The url of the isolate server to upload to.
2008 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00002009 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002010 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002011 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002012 # Filter out symlinks, since they are not represented by items on isolate
2013 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002014 items = []
2015 seen = set()
2016 skipped = 0
2017 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07002018 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002019 if 'l' not in metadata and filepath not in seen:
2020 seen.add(filepath)
2021 item = FileItem(
2022 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002023 digest=metadata['h'],
2024 size=metadata['s'],
2025 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002026 items.append(item)
2027 else:
2028 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002029
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002030 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002031 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002032 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07002033
2034
maruel4409e302016-07-19 14:25:51 -07002035def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002036 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002037
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002038 Arguments:
2039 isolated_hash: hash of the root *.isolated file.
2040 storage: Storage class that communicates with isolate storage.
2041 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002042 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07002043 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002044
2045 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002046 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002047 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002048 logging.debug(
maruel4409e302016-07-19 14:25:51 -07002049 'fetch_isolated(%s, %s, %s, %s, %s)',
2050 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002051 # Hash algorithm to use, defined by namespace |storage| is using.
2052 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002053 with cache:
2054 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002055 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002056
2057 with tools.Profiler('GetIsolateds'):
2058 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002059 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002060 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07002061 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002062 try:
maruel1ceb3872015-10-14 06:10:44 -07002063 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002064 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002065 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002066 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
2067 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002068
2069 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002070 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002071
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002072 with tools.Profiler('GetRest'):
2073 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07002074 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002075 create_directories(outdir, bundle.files)
2076 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002077
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002078 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002079 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07002080 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002081
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002082 # Multimap: digest -> list of pairs (path, props).
2083 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002084 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002085 if 'h' in props:
2086 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002087
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002088 # Now block on the remaining files to be downloaded and mapped.
2089 logging.info('Retrieving remaining files (%d of them)...',
2090 fetch_queue.pending_count)
2091 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07002092 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002093 while remaining:
2094 detector.ping()
2095
2096 # Wait for any item to finish fetching to cache.
2097 digest = fetch_queue.wait(remaining)
2098
tansell9e04a8d2016-07-28 09:31:59 -07002099 # Create the files in the destination using item in cache as the
2100 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002101 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07002102 fullpath = os.path.join(outdir, filepath)
2103
2104 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07002105 filetype = props.get('t', 'basic')
2106
2107 if filetype == 'basic':
2108 file_mode = props.get('m')
2109 if file_mode:
2110 # Ignore all bits apart from the user
2111 file_mode &= 0700
2112 putfile(
2113 srcfileobj, fullpath, file_mode,
2114 use_symlink=use_symlinks)
2115
tansell26de79e2016-11-13 18:41:11 -08002116 elif filetype == 'tar':
2117 basedir = os.path.dirname(fullpath)
2118 with tarfile.TarFile(fileobj=srcfileobj) as extractor:
2119 for ti in extractor:
2120 if not ti.isfile():
2121 logging.warning(
2122 'Path(%r) is nonfile (%s), skipped',
2123 ti.name, ti.type)
2124 continue
2125 fp = os.path.normpath(os.path.join(basedir, ti.name))
2126 if not fp.startswith(basedir):
2127 logging.error(
2128 'Path(%r) is outside root directory',
2129 fp)
2130 ifd = extractor.extractfile(ti)
2131 file_path.ensure_tree(os.path.dirname(fp))
2132 putfile(ifd, fp, 0700, ti.size)
2133
tanselle4288c32016-07-28 09:45:40 -07002134 elif filetype == 'ar':
2135 basedir = os.path.dirname(fullpath)
2136 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
2137 for ai, ifd in extractor:
2138 fp = os.path.normpath(os.path.join(basedir, ai.name))
tansell26de79e2016-11-13 18:41:11 -08002139 if not fp.startswith(basedir):
2140 logging.error(
2141 'Path(%r) is outside root directory',
2142 fp)
tanselle4288c32016-07-28 09:45:40 -07002143 file_path.ensure_tree(os.path.dirname(fp))
2144 putfile(ifd, fp, 0700, ai.size)
2145
2146 else:
2147 raise isolated_format.IsolatedError(
2148 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002149
2150 # Report progress.
2151 duration = time.time() - last_update
2152 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
2153 msg = '%d files remaining...' % len(remaining)
2154 print msg
2155 logging.info(msg)
2156 last_update = time.time()
2157
2158 # Cache could evict some items we just tried to fetch, it's a fatal error.
2159 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002160 raise isolated_format.MappingError(
2161 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002162 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002163
2164
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002165def directory_to_metadata(root, algo, blacklist):
2166 """Returns the FileItem list and .isolated metadata for a directory."""
2167 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002168 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07002169 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002170 metadata = {
2171 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05002172 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002173 for relpath in paths
2174 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002175 for v in metadata.itervalues():
2176 v.pop('t')
2177 items = [
2178 FileItem(
2179 path=os.path.join(root, relpath),
2180 digest=meta['h'],
2181 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002182 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002183 for relpath, meta in metadata.iteritems() if 'h' in meta
2184 ]
2185 return items, metadata
2186
2187
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002188def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002189 """Stores every entries and returns the relevant data.
2190
2191 Arguments:
2192 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002193 files: list of file paths to upload. If a directory is specified, a
2194 .isolated file is created and its hash is returned.
2195 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07002196
2197 Returns:
2198 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
2199 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002200 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002201 assert all(isinstance(i, unicode) for i in files), files
2202 if len(files) != len(set(map(os.path.abspath, files))):
2203 raise Error('Duplicate entries found.')
2204
maruel064c0a32016-04-05 11:47:15 -07002205 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002206 results = []
2207 # The temporary directory is only created as needed.
2208 tempdir = None
2209 try:
2210 # TODO(maruel): Yield the files to a worker thread.
2211 items_to_upload = []
2212 for f in files:
2213 try:
2214 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07002215 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002216 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002217 items, metadata = directory_to_metadata(
2218 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002219
2220 # Create the .isolated file.
2221 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002222 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
2223 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002224 os.close(handle)
2225 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002226 'algo':
2227 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002228 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002229 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002230 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002231 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002232 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002233 items_to_upload.extend(items)
2234 items_to_upload.append(
2235 FileItem(
2236 path=isolated,
2237 digest=h,
maruel12e30012015-10-09 11:55:35 -07002238 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002239 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002240 results.append((h, f))
2241
maruel12e30012015-10-09 11:55:35 -07002242 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002243 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002244 items_to_upload.append(
2245 FileItem(
2246 path=filepath,
2247 digest=h,
maruel12e30012015-10-09 11:55:35 -07002248 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002249 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002250 results.append((h, f))
2251 else:
2252 raise Error('%s is neither a file or directory.' % f)
2253 except OSError:
2254 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07002255 uploaded = storage.upload_items(items_to_upload)
2256 cold = [i for i in items_to_upload if i in uploaded]
2257 hot = [i for i in items_to_upload if i not in uploaded]
2258 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002259 finally:
maruel12e30012015-10-09 11:55:35 -07002260 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002261 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002262
2263
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002264def archive(out, namespace, files, blacklist):
2265 if files == ['-']:
2266 files = sys.stdin.readlines()
2267
2268 if not files:
2269 raise Error('Nothing to upload')
2270
2271 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002272 blacklist = tools.gen_blacklist(blacklist)
2273 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002274 # Ignore stats.
2275 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002276 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2277
2278
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002279@subcommand.usage('<file1..fileN> or - to read from stdin')
2280def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002281 """Archives data to the server.
2282
2283 If a directory is specified, a .isolated file is created the whole directory
2284 is uploaded. Then this .isolated file can be included in another one to run
2285 commands.
2286
2287 The commands output each file that was processed with its content hash. For
2288 directories, the .isolated generated for the directory is listed as the
2289 directory entry itself.
2290 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002291 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002292 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002293 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07002294 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002295 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002296 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002297 except Error as e:
2298 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002299 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002300
2301
2302def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002303 """Download data from the server.
2304
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002305 It can either download individual files or a complete tree from a .isolated
2306 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002307 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002308 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002309 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002310 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002311 help='hash of an isolated file, .isolated file content is discarded, use '
2312 '--file if you need it')
2313 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002314 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2315 help='hash and destination of a file, can be used multiple times')
2316 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002317 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002318 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07002319 parser.add_option(
2320 '--use-symlinks', action='store_true',
2321 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002322 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002323 options, args = parser.parse_args(args)
2324 if args:
2325 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002326
nodir55be77b2016-05-03 09:39:57 -07002327 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002328 if bool(options.isolated) == bool(options.file):
2329 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07002330 if not options.cache and options.use_symlinks:
2331 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002332
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002333 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07002334 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07002335 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002336 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002337 if (fs.isfile(options.target) or
2338 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002339 parser.error(
2340 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002341 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002342 # Fetching individual files.
2343 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002344 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002345 channel = threading_utils.TaskChannel()
2346 pending = {}
2347 for digest, dest in options.file:
2348 pending[digest] = dest
2349 storage.async_fetch(
2350 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002351 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002352 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002353 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002354 functools.partial(file_write, os.path.join(options.target, dest)))
2355 while pending:
2356 fetched = channel.pull()
2357 dest = pending.pop(fetched)
2358 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002359
Vadim Shtayura3172be52013-12-03 12:49:05 -08002360 # Fetching whole isolated tree.
2361 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002362 with cache:
2363 bundle = fetch_isolated(
2364 isolated_hash=options.isolated,
2365 storage=storage,
2366 cache=cache,
maruel4409e302016-07-19 14:25:51 -07002367 outdir=options.target,
2368 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002369 if bundle.command:
2370 rel = os.path.join(options.target, bundle.relative_cwd)
2371 print('To run this test please run from the directory %s:' %
2372 os.path.join(options.target, rel))
2373 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002374
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002375 return 0
2376
2377
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002378def add_archive_options(parser):
2379 parser.add_option(
2380 '--blacklist',
2381 action='append', default=list(DEFAULT_BLACKLIST),
2382 help='List of regexp to use as blacklist filter when uploading '
2383 'directories')
2384
2385
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002386def add_isolate_server_options(parser):
2387 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002388 parser.add_option(
2389 '-I', '--isolate-server',
2390 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002391 help='URL of the Isolate Server to use. Defaults to the environment '
2392 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2393 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002394 parser.add_option(
2395 '--namespace', default='default-gzip',
2396 help='The namespace to use on the Isolate Server, default: %default')
2397
2398
nodir55be77b2016-05-03 09:39:57 -07002399def process_isolate_server_options(
2400 parser, options, set_exception_handler, required):
2401 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002402
2403 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002404 """
2405 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002406 if required:
2407 parser.error('--isolate-server is required.')
2408 return
2409
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002410 try:
2411 options.isolate_server = net.fix_url(options.isolate_server)
2412 except ValueError as e:
2413 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002414 if set_exception_handler:
2415 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002416 try:
2417 return auth.ensure_logged_in(options.isolate_server)
2418 except ValueError as e:
2419 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002420
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002421
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002422def add_cache_options(parser):
2423 cache_group = optparse.OptionGroup(parser, 'Cache management')
2424 cache_group.add_option(
2425 '--cache', metavar='DIR',
2426 help='Directory to keep a local cache of the files. Accelerates download '
2427 'by reusing already downloaded files. Default=%default')
2428 cache_group.add_option(
2429 '--max-cache-size',
2430 type='int',
2431 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002432 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002433 help='Trim if the cache gets larger than this value, default=%default')
2434 cache_group.add_option(
2435 '--min-free-space',
2436 type='int',
2437 metavar='NNN',
2438 default=2*1024*1024*1024,
2439 help='Trim if disk free space becomes lower than this value, '
2440 'default=%default')
2441 cache_group.add_option(
2442 '--max-items',
2443 type='int',
2444 metavar='NNN',
2445 default=100000,
2446 help='Trim if more than this number of items are in the cache '
2447 'default=%default')
2448 parser.add_option_group(cache_group)
2449
2450
nodirf33b8d62016-10-26 22:34:58 -07002451def process_cache_options(options, trim=True):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002452 if options.cache:
2453 policies = CachePolicies(
2454 options.max_cache_size, options.min_free_space, options.max_items)
2455
2456 # |options.cache| path may not exist until DiskCache() instance is created.
2457 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002458 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002459 policies,
nodirf33b8d62016-10-26 22:34:58 -07002460 isolated_format.get_hash_algo(options.namespace),
2461 trim=trim)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002462 else:
2463 return MemoryCache()
2464
2465
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002466class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002467 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002468 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002469 self,
2470 version=__version__,
2471 prog=os.path.basename(sys.modules[__name__].__file__),
2472 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002473 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002474
2475 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002476 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002477 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002478 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002479 return options, args
2480
2481
2482def main(args):
2483 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002484 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002485
2486
2487if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002488 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002489 fix_encoding.fix_encoding()
2490 tools.disable_buffering()
2491 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002492 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002493 sys.exit(main(sys.argv[1:]))