blob: c1852592a6efb497ff006fd83be813fd9226385b [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
tansell9e04a8d2016-07-28 09:31:59 -07008__version__ = '0.6.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
13import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040015import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000022import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050024import types
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000025import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000026
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027from third_party import colorama
28from third_party.depot_tools import fix_encoding
29from third_party.depot_tools import subcommand
30
tanselle4288c32016-07-28 09:45:40 -070031from libs import arfile
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050032from utils import file_path
maruel12e30012015-10-09 11:55:35 -070033from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040034from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040035from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000036from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040037from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070038from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000039from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000040from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080042import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040043import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# The file size to be used when we don't know the correct file size,
51# generally used for .isolated files.
52UNKNOWN_FILE_SIZE = None
53
54
55# Maximum expected delay (in seconds) between successive file fetches or uploads
56# in Storage. If it takes longer than that, a deadlock might be happening
57# and all stack frames for all threads are dumped to log.
58DEADLOCK_TIMEOUT = 5 * 60
59
60
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000061# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000062# All files are sorted by likelihood of a change in the file content
63# (currently file size is used to estimate this: larger the file -> larger the
64# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000065# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000066# and so on. Numbers here is a trade-off; the more per request, the lower the
67# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
68# larger values cause longer lookups, increasing the initial latency to start
69# uploading, which is especially an issue for large files. This value is
70# optimized for the "few thousands files to look up with minimal number of large
71# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040072ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000073
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000074
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075# A list of already compressed extension types that should not receive any
76# compression before being uploaded.
77ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040078 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
79 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000080]
81
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000082
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000083# Chunk size to use when reading from network stream.
84NET_IO_FILE_CHUNK = 16 * 1024
85
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000086
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000087# Read timeout in seconds for downloads from isolate storage. If there's no
88# response from the server within this timeout whole download will be aborted.
89DOWNLOAD_READ_TIMEOUT = 60
90
91
maruel@chromium.org41601642013-09-18 19:40:46 +000092# The delay (in seconds) to wait between logging statements when retrieving
93# the required files. This is intended to let the user (or buildbot) know that
94# the program is still running.
95DELAY_BETWEEN_UPDATES_IN_SECS = 30
96
97
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050098DEFAULT_BLACKLIST = (
99 # Temporary vim or python files.
100 r'^.+\.(?:pyc|swp)$',
101 # .git or .svn directory.
102 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
103)
104
105
Vadim Shtayura8623c272014-12-01 11:45:27 -0800106# A class to use to communicate with the server by default. Can be changed by
107# 'set_storage_api_class'. Default is IsolateServer.
108_storage_api_cls = None
109
110
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500111class Error(Exception):
112 """Generic runtime error."""
113 pass
114
115
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400116class Aborted(Error):
117 """Operation aborted."""
118 pass
119
120
nodir90bc8dc2016-06-15 13:35:21 -0700121class AlreadyExists(Error):
122 """File already exists."""
123
124
maruel12e30012015-10-09 11:55:35 -0700125def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800126 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700127 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800128 if offset:
129 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000130 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000131 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 if not data:
133 break
134 yield data
135
136
maruel12e30012015-10-09 11:55:35 -0700137def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138 """Writes file content as generated by content_generator.
139
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000140 Creates the intermediary directory as needed.
141
142 Returns the number of bytes written.
143
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000144 Meant to be mocked out in unit tests.
145 """
nodire5028a92016-04-29 14:38:21 -0700146 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 total = 0
maruel12e30012015-10-09 11:55:35 -0700148 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000149 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000150 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000152 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153
154
tansell9e04a8d2016-07-28 09:31:59 -0700155def fileobj_path(fileobj):
156 """Return file system path for file like object or None.
157
158 The returned path is guaranteed to exist and can be passed to file system
159 operations like copy.
160 """
161 name = getattr(fileobj, 'name', None)
162 if name is None:
163 return
164
165 # If the file like object was created using something like open("test.txt")
166 # name will end up being a str (such as a function outside our control, like
167 # the standard library). We want all our paths to be unicode objects, so we
168 # decode it.
169 if not isinstance(name, unicode):
170 name = name.decode(sys.getfilesystemencoding())
171
172 if fs.exists(name):
173 return name
174
175
176# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
177# wrappers have been created.
178def fileobj_copy(
179 dstfileobj, srcfileobj, size=-1,
180 chunk_size=isolated_format.DISK_FILE_CHUNK):
181 """Copy data from srcfileobj to dstfileobj.
182
183 Providing size means exactly that amount of data will be copied (if there
184 isn't enough data, an IOError exception is thrown). Otherwise all data until
185 the EOF marker will be copied.
186 """
187 if size == -1 and hasattr(srcfileobj, 'tell'):
188 if srcfileobj.tell() != 0:
189 raise IOError('partial file but not using size')
190
191 written = 0
192 while written != size:
193 readsize = chunk_size
194 if size > 0:
195 readsize = min(readsize, size-written)
196 data = srcfileobj.read(readsize)
197 if not data:
198 if size == -1:
199 break
200 raise IOError('partial file, got %s, wanted %s' % (written, size))
201 dstfileobj.write(data)
202 written += len(data)
203
204
205def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
206 """Put srcfileobj at the given dstpath with given mode.
207
208 The function aims to do this as efficiently as possible while still allowing
209 any possible file like object be given.
210
211 Creating a tree of hardlinks has a few drawbacks:
212 - tmpfs cannot be used for the scratch space. The tree has to be on the same
213 partition as the cache.
214 - involves a write to the inode, which advances ctime, cause a metadata
215 writeback (causing disk seeking).
216 - cache ctime cannot be used to detect modifications / corruption.
217 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
218 partition. This is why the function automatically fallbacks to copying the
219 file content.
220 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
221 same owner is for all hardlinks.
222 - Anecdotal report that ext2 is known to be potentially faulty on high rate
223 of hardlink creation.
224
225 Creating a tree of symlinks has a few drawbacks:
226 - Tasks running the equivalent of os.path.realpath() will get the naked path
227 and may fail.
228 - Windows:
229 - Symlinks are reparse points:
230 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
231 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
232 - Symbolic links are Win32 paths, not NT paths.
233 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
234 - Symbolic links are supported on Windows 7 and later only.
235 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
236 default.
237 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
238 RID is present in the token;
239 https://msdn.microsoft.com/en-us/library/bb530410.aspx
240 """
241 srcpath = fileobj_path(srcfileobj)
242 if srcpath and size == -1:
243 readonly = file_mode is None or (
244 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
245
246 if readonly:
247 # If the file is read only we can link the file
248 if use_symlink:
249 link_mode = file_path.SYMLINK_WITH_FALLBACK
250 else:
251 link_mode = file_path.HARDLINK_WITH_FALLBACK
252 else:
253 # If not read only, we must copy the file
254 link_mode = file_path.COPY
255
256 file_path.link_file(dstpath, srcpath, link_mode)
257 else:
258 # Need to write out the file
259 with fs.open(dstpath, 'wb') as dstfileobj:
260 fileobj_copy(dstfileobj, srcfileobj, size)
261
262 assert fs.exists(dstpath)
263
264 # file_mode of 0 is actually valid, so need explicit check.
265 if file_mode is not None:
266 fs.chmod(dstpath, file_mode)
267
268
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000269def zip_compress(content_generator, level=7):
270 """Reads chunks from |content_generator| and yields zip compressed chunks."""
271 compressor = zlib.compressobj(level)
272 for chunk in content_generator:
273 compressed = compressor.compress(chunk)
274 if compressed:
275 yield compressed
276 tail = compressor.flush(zlib.Z_FINISH)
277 if tail:
278 yield tail
279
280
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400281def zip_decompress(
282 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000283 """Reads zipped data from |content_generator| and yields decompressed data.
284
285 Decompresses data in small chunks (no larger than |chunk_size|) so that
286 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
287
288 Raises IOError if data is corrupted or incomplete.
289 """
290 decompressor = zlib.decompressobj()
291 compressed_size = 0
292 try:
293 for chunk in content_generator:
294 compressed_size += len(chunk)
295 data = decompressor.decompress(chunk, chunk_size)
296 if data:
297 yield data
298 while decompressor.unconsumed_tail:
299 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
300 if data:
301 yield data
302 tail = decompressor.flush()
303 if tail:
304 yield tail
305 except zlib.error as e:
306 raise IOError(
307 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
308 # Ensure all data was read and decompressed.
309 if decompressor.unused_data or decompressor.unconsumed_tail:
310 raise IOError('Not all data was decompressed')
311
312
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000313def get_zip_compression_level(filename):
314 """Given a filename calculates the ideal zip compression level to use."""
315 file_ext = os.path.splitext(filename)[1].lower()
316 # TODO(csharp): Profile to find what compression level works best.
317 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
318
319
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000320def create_directories(base_directory, files):
321 """Creates the directory structure needed by the given list of files."""
322 logging.debug('create_directories(%s, %d)', base_directory, len(files))
323 # Creates the tree of directories to create.
324 directories = set(os.path.dirname(f) for f in files)
325 for item in list(directories):
326 while item:
327 directories.add(item)
328 item = os.path.dirname(item)
329 for d in sorted(directories):
330 if d:
maruel12e30012015-10-09 11:55:35 -0700331 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000332
333
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500334def create_symlinks(base_directory, files):
335 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000336 for filepath, properties in files:
337 if 'l' not in properties:
338 continue
339 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500340 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000341 logging.warning('Ignoring symlink %s', filepath)
342 continue
343 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700344 try:
345 os.symlink(properties['l'], outfile) # pylint: disable=E1101
346 except OSError as e:
347 if e.errno == errno.EEXIST:
348 raise AlreadyExists('File %s already exists.' % outfile)
349 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000350
351
maruel12e30012015-10-09 11:55:35 -0700352def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000353 """Determines if the given files appears valid.
354
355 Currently it just checks the file's size.
356 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700357 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700358 return fs.isfile(path)
359 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000360 if size != actual_size:
361 logging.warning(
362 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700363 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000364 return False
365 return True
366
367
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000368class Item(object):
369 """An item to push to Storage.
370
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800371 Its digest and size may be provided in advance, if known. Otherwise they will
372 be derived from content(). If digest is provided, it MUST correspond to
373 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000374
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800375 When used with Storage, Item starts its life in a main thread, travels
376 to 'contains' thread, then to 'push' thread and then finally back to
377 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000378 """
379
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800380 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000381 self.digest = digest
382 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800383 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000384 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000385
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800386 def content(self):
387 """Iterable with content of this item as byte string (str) chunks."""
388 raise NotImplementedError()
389
390 def prepare(self, hash_algo):
391 """Ensures self.digest and self.size are set.
392
393 Uses content() as a source of data to calculate them. Does nothing if digest
394 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000395
396 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800397 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000398 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800399 if self.digest is None or self.size is None:
400 digest = hash_algo()
401 total = 0
402 for chunk in self.content():
403 digest.update(chunk)
404 total += len(chunk)
405 self.digest = digest.hexdigest()
406 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000407
408
409class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800410 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000411
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800412 Its digest and size may be provided in advance, if known. Otherwise they will
413 be derived from the file content.
414 """
415
416 def __init__(self, path, digest=None, size=None, high_priority=False):
417 super(FileItem, self).__init__(
418 digest,
maruel12e30012015-10-09 11:55:35 -0700419 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800420 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000421 self.path = path
422 self.compression_level = get_zip_compression_level(path)
423
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800424 def content(self):
425 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000426
427
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000428class BufferItem(Item):
429 """A byte buffer to push to Storage."""
430
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800431 def __init__(self, buf, high_priority=False):
432 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000433 self.buffer = buf
434
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800435 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000436 return [self.buffer]
437
438
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000439class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800440 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000441
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800442 Implements compression support, parallel 'contains' checks, parallel uploads
443 and more.
444
445 Works only within single namespace (and thus hashing algorithm and compression
446 scheme are fixed).
447
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400448 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
449 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800450 """
451
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700452 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000453 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400454 self._use_zip = isolated_format.is_namespace_with_compression(
455 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400456 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 self._cpu_thread_pool = None
458 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400459 self._aborted = False
460 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000461
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700463 def hash_algo(self):
464 """Hashing algorithm used to name files in storage based on their content.
465
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400466 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700467 """
468 return self._hash_algo
469
470 @property
471 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500472 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700473 return self._storage_api.location
474
475 @property
476 def namespace(self):
477 """Isolate namespace used by this storage.
478
479 Indirectly defines hashing scheme and compression method used.
480 """
481 return self._storage_api.namespace
482
483 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000484 def cpu_thread_pool(self):
485 """ThreadPool for CPU-bound tasks like zipping."""
486 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500487 threads = max(threading_utils.num_processors(), 2)
488 if sys.maxsize <= 2L**32:
489 # On 32 bits userland, do not try to use more than 16 threads.
490 threads = min(threads, 16)
491 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000492 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000493
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000494 @property
495 def net_thread_pool(self):
496 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
497 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700498 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000499 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000500
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000501 def close(self):
502 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400503 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000504 if self._cpu_thread_pool:
505 self._cpu_thread_pool.join()
506 self._cpu_thread_pool.close()
507 self._cpu_thread_pool = None
508 if self._net_thread_pool:
509 self._net_thread_pool.join()
510 self._net_thread_pool.close()
511 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400512 logging.info('Done.')
513
514 def abort(self):
515 """Cancels any pending or future operations."""
516 # This is not strictly theadsafe, but in the worst case the logging message
517 # will be printed twice. Not a big deal. In other places it is assumed that
518 # unprotected reads and writes to _aborted are serializable (it is true
519 # for python) and thus no locking is used.
520 if not self._aborted:
521 logging.warning('Aborting... It can take a while.')
522 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000523
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000524 def __enter__(self):
525 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400526 assert not self._prev_sig_handlers, self._prev_sig_handlers
527 for s in (signal.SIGINT, signal.SIGTERM):
528 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000529 return self
530
531 def __exit__(self, _exc_type, _exc_value, _traceback):
532 """Context manager interface."""
533 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400534 while self._prev_sig_handlers:
535 s, h = self._prev_sig_handlers.popitem()
536 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 return False
538
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000539 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800540 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800542 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543
544 Arguments:
545 items: list of Item instances that represents data to upload.
546
547 Returns:
548 List of items that were uploaded. All other items are already there.
549 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700550 logging.info('upload_items(items=%d)', len(items))
551
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800552 # Ensure all digests are calculated.
553 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700554 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800555
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000556 # For each digest keep only first Item that matches it. All other items
557 # are just indistinguishable copies from the point of view of isolate
558 # server (it doesn't care about paths at all, only content and digests).
559 seen = {}
560 duplicates = 0
561 for item in items:
562 if seen.setdefault(item.digest, item) is not item:
563 duplicates += 1
564 items = seen.values()
565 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700566 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000567
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000568 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000569 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800571 channel = threading_utils.TaskChannel()
572 for missing_item, push_state in self.get_missing_items(items):
573 missing.add(missing_item)
574 self.async_push(channel, missing_item, push_state)
575
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000576 # No need to spawn deadlock detector thread if there's nothing to upload.
577 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700578 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000579 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000580 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000581 detector.ping()
582 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000583 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000584 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000585 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000586 logging.info('All files are uploaded')
587
588 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000589 total = len(items)
590 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000591 logging.info(
592 'Total: %6d, %9.1fkb',
593 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000594 total_size / 1024.)
595 cache_hit = set(items) - missing
596 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000597 logging.info(
598 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
599 len(cache_hit),
600 cache_hit_size / 1024.,
601 len(cache_hit) * 100. / total,
602 cache_hit_size * 100. / total_size if total_size else 0)
603 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000604 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 logging.info(
606 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
607 len(cache_miss),
608 cache_miss_size / 1024.,
609 len(cache_miss) * 100. / total,
610 cache_miss_size * 100. / total_size if total_size else 0)
611
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000612 return uploaded
613
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000615 """Starts asynchronous push to the server in a parallel thread.
616
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800617 Can be used only after |item| was checked for presence on a server with
618 'get_missing_items' call. 'get_missing_items' returns |push_state| object
619 that contains storage specific information describing how to upload
620 the item (for example in case of cloud storage, it is signed upload URLs).
621
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000622 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000623 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000624 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800625 push_state: push state returned by 'get_missing_items' call for |item|.
626
627 Returns:
628 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000629 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800630 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400631 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700632 threading_utils.PRIORITY_HIGH if item.high_priority
633 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800634
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000635 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400636 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400637 if self._aborted:
638 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700639 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800640 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000641 return item
642
643 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700644 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800645 self.net_thread_pool.add_task_with_channel(
646 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000647 return
648
649 # If zipping is enabled, zip in a separate thread.
650 def zip_and_push():
651 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
652 # content right here. It will block until all file is zipped.
653 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400654 if self._aborted:
655 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800656 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000657 data = ''.join(stream)
658 except Exception as exc:
659 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800660 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000661 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000662 self.net_thread_pool.add_task_with_channel(
663 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000664 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000665
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800666 def push(self, item, push_state):
667 """Synchronously pushes a single item to the server.
668
669 If you need to push many items at once, consider using 'upload_items' or
670 'async_push' with instance of TaskChannel.
671
672 Arguments:
673 item: item to upload as instance of Item class.
674 push_state: push state returned by 'get_missing_items' call for |item|.
675
676 Returns:
677 Pushed item (same object as |item|).
678 """
679 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700680 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800681 self.async_push(channel, item, push_state)
682 pushed = channel.pull()
683 assert pushed is item
684 return item
685
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000686 def async_fetch(self, channel, priority, digest, size, sink):
687 """Starts asynchronous fetch from the server in a parallel thread.
688
689 Arguments:
690 channel: TaskChannel that receives back |digest| when download ends.
691 priority: thread pool task priority for the fetch.
692 digest: hex digest of an item to download.
693 size: expected size of the item (after decompression).
694 sink: function that will be called as sink(generator).
695 """
696 def fetch():
697 try:
698 # Prepare reading pipeline.
699 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700700 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400701 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000702 # Run |stream| through verifier that will assert its size.
703 verifier = FetchStreamVerifier(stream, size)
704 # Verified stream goes to |sink|.
705 sink(verifier.run())
706 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800707 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000708 raise
709 return digest
710
711 # Don't bother with zip_thread_pool for decompression. Decompression is
712 # really fast and most probably IO bound anyway.
713 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
714
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000715 def get_missing_items(self, items):
716 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000717
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000718 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000719
720 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000721 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000722
723 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800724 For each missing item it yields a pair (item, push_state), where:
725 * item - Item object that is missing (one of |items|).
726 * push_state - opaque object that contains storage specific information
727 describing how to upload the item (for example in case of cloud
728 storage, it is signed upload URLs). It can later be passed to
729 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000730 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000731 channel = threading_utils.TaskChannel()
732 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800733
734 # Ensure all digests are calculated.
735 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700736 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800737
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400738 def contains(batch):
739 if self._aborted:
740 raise Aborted()
741 return self._storage_api.contains(batch)
742
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000743 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800744 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400745 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400746 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000747 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800748
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000749 # Yield results as they come in.
750 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800751 for missing_item, push_state in channel.pull().iteritems():
752 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000753
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000754
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800755def batch_items_for_check(items):
756 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000757
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800758 Each batch corresponds to a single 'exists?' query to the server via a call
759 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000760
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800761 Arguments:
762 items: a list of Item objects.
763
764 Yields:
765 Batches of items to query for existence in a single operation,
766 each batch is a list of Item objects.
767 """
768 batch_count = 0
769 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
770 next_queries = []
771 for item in sorted(items, key=lambda x: x.size, reverse=True):
772 next_queries.append(item)
773 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000774 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800775 next_queries = []
776 batch_count += 1
777 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
778 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
779 if next_queries:
780 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000781
782
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000783class FetchQueue(object):
784 """Fetches items from Storage and places them into LocalCache.
785
786 It manages multiple concurrent fetch operations. Acts as a bridge between
787 Storage and LocalCache so that Storage and LocalCache don't depend on each
788 other at all.
789 """
790
791 def __init__(self, storage, cache):
792 self.storage = storage
793 self.cache = cache
794 self._channel = threading_utils.TaskChannel()
795 self._pending = set()
796 self._accessed = set()
797 self._fetched = cache.cached_set()
798
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400799 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700800 self,
801 digest,
802 size=UNKNOWN_FILE_SIZE,
803 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000804 """Starts asynchronous fetch of item |digest|."""
805 # Fetching it now?
806 if digest in self._pending:
807 return
808
809 # Mark this file as in use, verify_all_cached will later ensure it is still
810 # in cache.
811 self._accessed.add(digest)
812
813 # Already fetched? Notify cache to update item's LRU position.
814 if digest in self._fetched:
815 # 'touch' returns True if item is in cache and not corrupted.
816 if self.cache.touch(digest, size):
817 return
818 # Item is corrupted, remove it from cache and fetch it again.
819 self._fetched.remove(digest)
820 self.cache.evict(digest)
821
822 # TODO(maruel): It should look at the free disk space, the current cache
823 # size and the size of the new item on every new item:
824 # - Trim the cache as more entries are listed when free disk space is low,
825 # otherwise if the amount of data downloaded during the run > free disk
826 # space, it'll crash.
827 # - Make sure there's enough free disk space to fit all dependencies of
828 # this run! If not, abort early.
829
830 # Start fetching.
831 self._pending.add(digest)
832 self.storage.async_fetch(
833 self._channel, priority, digest, size,
834 functools.partial(self.cache.write, digest))
835
836 def wait(self, digests):
837 """Starts a loop that waits for at least one of |digests| to be retrieved.
838
839 Returns the first digest retrieved.
840 """
841 # Flush any already fetched items.
842 for digest in digests:
843 if digest in self._fetched:
844 return digest
845
846 # Ensure all requested items are being fetched now.
847 assert all(digest in self._pending for digest in digests), (
848 digests, self._pending)
849
850 # Wait for some requested item to finish fetching.
851 while self._pending:
852 digest = self._channel.pull()
853 self._pending.remove(digest)
854 self._fetched.add(digest)
855 if digest in digests:
856 return digest
857
858 # Should never reach this point due to assert above.
859 raise RuntimeError('Impossible state')
860
861 def inject_local_file(self, path, algo):
862 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700863 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000864 data = f.read()
865 digest = algo(data).hexdigest()
866 self.cache.write(digest, [data])
867 self._fetched.add(digest)
868 return digest
869
870 @property
871 def pending_count(self):
872 """Returns number of items to be fetched."""
873 return len(self._pending)
874
875 def verify_all_cached(self):
876 """True if all accessed items are in cache."""
877 return self._accessed.issubset(self.cache.cached_set())
878
879
880class FetchStreamVerifier(object):
881 """Verifies that fetched file is valid before passing it to the LocalCache."""
882
883 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400884 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000885 self.stream = stream
886 self.expected_size = expected_size
887 self.current_size = 0
888
889 def run(self):
890 """Generator that yields same items as |stream|.
891
892 Verifies |stream| is complete before yielding a last chunk to consumer.
893
894 Also wraps IOError produced by consumer into MappingError exceptions since
895 otherwise Storage will retry fetch on unrelated local cache errors.
896 """
897 # Read one chunk ahead, keep it in |stored|.
898 # That way a complete stream can be verified before pushing last chunk
899 # to consumer.
900 stored = None
901 for chunk in self.stream:
902 assert chunk is not None
903 if stored is not None:
904 self._inspect_chunk(stored, is_last=False)
905 try:
906 yield stored
907 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400908 raise isolated_format.MappingError(
909 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000910 stored = chunk
911 if stored is not None:
912 self._inspect_chunk(stored, is_last=True)
913 try:
914 yield stored
915 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400916 raise isolated_format.MappingError(
917 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000918
919 def _inspect_chunk(self, chunk, is_last):
920 """Called for each fetched chunk before passing it to consumer."""
921 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400922 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700923 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000924 (self.expected_size != self.current_size)):
925 raise IOError('Incorrect file size: expected %d, got %d' % (
926 self.expected_size, self.current_size))
927
928
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000929class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800930 """Interface for classes that implement low-level storage operations.
931
932 StorageApi is oblivious of compression and hashing scheme used. This details
933 are handled in higher level Storage class.
934
935 Clients should generally not use StorageApi directly. Storage class is
936 preferred since it implements compression and upload optimizations.
937 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000938
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700939 @property
940 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500941 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700942 raise NotImplementedError()
943
944 @property
945 def namespace(self):
946 """Isolate namespace used by this storage.
947
948 Indirectly defines hashing scheme and compression method used.
949 """
950 raise NotImplementedError()
951
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800952 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000953 """Fetches an object and yields its content.
954
955 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000956 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800957 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000958
959 Yields:
960 Chunks of downloaded item (as str objects).
961 """
962 raise NotImplementedError()
963
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800964 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000965 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000966
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800967 |item| MUST go through 'contains' call to get |push_state| before it can
968 be pushed to the storage.
969
970 To be clear, here is one possible usage:
971 all_items = [... all items to push as Item subclasses ...]
972 for missing_item, push_state in storage_api.contains(all_items).items():
973 storage_api.push(missing_item, push_state)
974
975 When pushing to a namespace with compression, data that should be pushed
976 and data provided by the item is not the same. In that case |content| is
977 not None and it yields chunks of compressed data (using item.content() as
978 a source of original uncompressed data). This is implemented by Storage
979 class.
980
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000981 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000982 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800983 push_state: push state object as returned by 'contains' call.
984 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000985
986 Returns:
987 None.
988 """
989 raise NotImplementedError()
990
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000991 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800992 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000993
994 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800995 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000996
997 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800998 A dict missing Item -> opaque push state object to be passed to 'push'.
999 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001000 """
1001 raise NotImplementedError()
1002
1003
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001004class _IsolateServerPushState(object):
1005 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -05001006
1007 Note this needs to be a global class to support pickling.
1008 """
1009
Cory Massarocc19c8c2015-03-10 13:35:11 -07001010 def __init__(self, preupload_status, size):
1011 self.preupload_status = preupload_status
1012 gs_upload_url = preupload_status.get('gs_upload_url') or None
1013 if gs_upload_url:
1014 self.upload_url = gs_upload_url
1015 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
1016 else:
1017 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
1018 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -05001019 self.uploaded = False
1020 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001021 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -05001022
1023
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001024class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001025 """StorageApi implementation that downloads and uploads to Isolate Server.
1026
1027 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001028 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001029 """
1030
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001031 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001032 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001033 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001034 self._base_url = base_url.rstrip('/')
1035 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -07001036 self._namespace_dict = {
1037 'compression': 'flate' if namespace.endswith(
1038 ('-gzip', '-flate')) else '',
1039 'digest_hash': 'sha-1',
1040 'namespace': namespace,
1041 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001042 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001043 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001044 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001045
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001046 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001047 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001048 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001049
1050 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001051 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001052 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001053 # TODO(maruel): Make this request much earlier asynchronously while the
1054 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001055
1056 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
1057 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001058
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001059 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001060 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001061 self._server_caps = net.url_read_json(
1062 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
1063 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001064 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001065
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001066 @property
1067 def location(self):
1068 return self._base_url
1069
1070 @property
1071 def namespace(self):
1072 return self._namespace
1073
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001074 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001075 assert offset >= 0
1076 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
1077 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001078 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001079 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001080
Cory Massarocc19c8c2015-03-10 13:35:11 -07001081 if not response:
maruele154f9c2015-09-14 11:03:15 -07001082 raise IOError(
1083 'Attempted to fetch from %s; no data exist: %s / %s.' % (
1084 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001085
Cory Massarocc19c8c2015-03-10 13:35:11 -07001086 # for DB uploads
1087 content = response.get('content')
1088 if content is not None:
maruel863ac262016-03-17 11:00:37 -07001089 yield base64.b64decode(content)
1090 return
Cory Massarocc19c8c2015-03-10 13:35:11 -07001091
1092 # for GS entities
1093 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -07001094 if not connection:
1095 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -07001096
1097 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001098 if offset:
1099 content_range = connection.get_header('Content-Range')
1100 if not content_range:
1101 raise IOError('Missing Content-Range header')
1102
1103 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1104 # According to a spec, <size> can be '*' meaning "Total size of the file
1105 # is not known in advance".
1106 try:
1107 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1108 if not match:
1109 raise ValueError()
1110 content_offset = int(match.group(1))
1111 last_byte_index = int(match.group(2))
1112 size = None if match.group(3) == '*' else int(match.group(3))
1113 except ValueError:
1114 raise IOError('Invalid Content-Range header: %s' % content_range)
1115
1116 # Ensure returned offset equals requested one.
1117 if offset != content_offset:
1118 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1119 offset, content_offset, content_range))
1120
1121 # Ensure entire tail of the file is returned.
1122 if size is not None and last_byte_index + 1 != size:
1123 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1124
maruel863ac262016-03-17 11:00:37 -07001125 for data in connection.iter_content(NET_IO_FILE_CHUNK):
1126 yield data
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001127
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001128 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001129 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001130 assert item.digest is not None
1131 assert item.size is not None
1132 assert isinstance(push_state, _IsolateServerPushState)
1133 assert not push_state.finalized
1134
1135 # Default to item.content().
1136 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001137 logging.info('Push state size: %d', push_state.size)
1138 if isinstance(content, (basestring, list)):
1139 # Memory is already used, too late.
1140 with self._lock:
1141 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001142 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001143 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1144 # If |content| is indeed a generator, it can not be re-winded back to the
1145 # beginning of the stream. A retry will find it exhausted. A possible
1146 # solution is to wrap |content| generator with some sort of caching
1147 # restartable generator. It should be done alongside streaming support
1148 # implementation.
1149 #
1150 # In theory, we should keep the generator, so that it is not serialized in
1151 # memory. Sadly net.HttpService.request() requires the body to be
1152 # serialized.
1153 assert isinstance(content, types.GeneratorType), repr(content)
1154 slept = False
1155 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001156 # One byte less than 512mb. This is to cope with incompressible content.
1157 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001158 while True:
1159 with self._lock:
1160 # This is due to 32 bits python when uploading very large files. The
1161 # problem is that it's comparing uncompressed sizes, while we care
1162 # about compressed sizes since it's what is serialized in memory.
1163 # The first check assumes large files are compressible and that by
1164 # throttling one upload at once, we can survive. Otherwise, kaboom.
1165 memory_use = self._memory_use
1166 if ((push_state.size >= max_size and not memory_use) or
1167 (memory_use + push_state.size <= max_size)):
1168 self._memory_use += push_state.size
1169 memory_use = self._memory_use
1170 break
1171 time.sleep(0.1)
1172 slept = True
1173 if slept:
1174 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001175
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001176 try:
1177 # This push operation may be a retry after failed finalization call below,
1178 # no need to reupload contents in that case.
1179 if not push_state.uploaded:
1180 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001181 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001182 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001183 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001184 item.digest, push_state.upload_url))
1185 push_state.uploaded = True
1186 else:
1187 logging.info(
1188 'A file %s already uploaded, retrying finalization only',
1189 item.digest)
1190
1191 # Optionally notify the server that it's done.
1192 if push_state.finalize_url:
1193 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1194 # send it to isolated server. That way isolate server can verify that
1195 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1196 # stored files).
1197 # TODO(maruel): Fix the server to accept properly data={} so
1198 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001199 response = net.url_read_json(
1200 url='%s/%s' % (self._base_url, push_state.finalize_url),
1201 data={
1202 'upload_ticket': push_state.preupload_status['upload_ticket'],
1203 })
1204 if not response or not response['ok']:
1205 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001206 push_state.finalized = True
1207 finally:
1208 with self._lock:
1209 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001210
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001211 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001212 # Ensure all items were initialized with 'prepare' call. Storage does that.
1213 assert all(i.digest is not None and i.size is not None for i in items)
1214
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001215 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001216 body = {
1217 'items': [
1218 {
1219 'digest': item.digest,
1220 'is_isolated': bool(item.high_priority),
1221 'size': item.size,
1222 } for item in items
1223 ],
1224 'namespace': self._namespace_dict,
1225 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001226
Cory Massarocc19c8c2015-03-10 13:35:11 -07001227 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001228
1229 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001230 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001231 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001232 response = net.url_read_json(url=query_url, data=body)
1233 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001234 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001235 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001236 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001237 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001238 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001239
1240 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001241 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001242 for preupload_status in response.get('items', []):
1243 assert 'upload_ticket' in preupload_status, (
1244 preupload_status, '/preupload did not generate an upload ticket')
1245 index = int(preupload_status['index'])
1246 missing_items[items[index]] = _IsolateServerPushState(
1247 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001248 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001249 len(items), len(items) - len(missing_items))
1250 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001251
Cory Massarocc19c8c2015-03-10 13:35:11 -07001252 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001253 """Fetches isolated data from the URL.
1254
1255 Used only for fetching files, not for API calls. Can be overridden in
1256 subclasses.
1257
1258 Args:
1259 url: URL to fetch the data from, can possibly return http redirect.
1260 offset: byte offset inside the file to start fetching from.
1261
1262 Returns:
1263 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1264 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001265 assert isinstance(offset, int)
1266 data = {
1267 'digest': digest.encode('utf-8'),
1268 'namespace': self._namespace_dict,
1269 'offset': offset,
1270 }
maruel0c25f4f2015-12-15 05:41:17 -08001271 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1272 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001273 return net.url_read_json(
1274 url=url,
1275 data=data,
1276 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001277
Cory Massarocc19c8c2015-03-10 13:35:11 -07001278 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001279 """Uploads isolated file to the URL.
1280
1281 Used only for storing files, not for API calls. Can be overridden in
1282 subclasses.
1283
1284 Args:
1285 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001286 push_state: an _IsolateServicePushState instance
1287 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001288 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001289 """
1290 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1291 # upload support is implemented.
1292 if isinstance(content, list) and len(content) == 1:
1293 content = content[0]
1294 else:
1295 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001296
1297 # DB upload
1298 if not push_state.finalize_url:
1299 url = '%s/%s' % (self._base_url, push_state.upload_url)
1300 content = base64.b64encode(content)
1301 data = {
1302 'upload_ticket': push_state.preupload_status['upload_ticket'],
1303 'content': content,
1304 }
1305 response = net.url_read_json(url=url, data=data)
1306 return response is not None and response['ok']
1307
1308 # upload to GS
1309 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001310 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001311 content_type='application/octet-stream',
1312 data=content,
1313 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001314 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001315 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001316 return response is not None
1317
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001318
nodir445097b2016-06-03 22:50:26 -07001319class CacheMiss(Exception):
1320 """Raised when an item is not in cache."""
1321
1322 def __init__(self, digest):
1323 self.digest = digest
1324 super(CacheMiss, self).__init__(
1325 'Item with digest %r is not found in cache' % digest)
1326
1327
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001328class LocalCache(object):
1329 """Local cache that stores objects fetched via Storage.
1330
1331 It can be accessed concurrently from multiple threads, so it should protect
1332 its internal state with some lock.
1333 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001334 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001335
maruel064c0a32016-04-05 11:47:15 -07001336 def __init__(self):
1337 self._lock = threading_utils.LockWithAssert()
1338 # Profiling values.
1339 self._added = []
1340 self._initial_number_items = 0
1341 self._initial_size = 0
1342 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -07001343 self._used = []
maruel064c0a32016-04-05 11:47:15 -07001344
nodirbe642ff2016-06-09 15:51:51 -07001345 def __contains__(self, digest):
1346 raise NotImplementedError()
1347
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001348 def __enter__(self):
1349 """Context manager interface."""
1350 return self
1351
1352 def __exit__(self, _exc_type, _exec_value, _traceback):
1353 """Context manager interface."""
1354 return False
1355
maruel064c0a32016-04-05 11:47:15 -07001356 @property
1357 def added(self):
1358 return self._added[:]
1359
1360 @property
1361 def evicted(self):
1362 return self._evicted[:]
1363
1364 @property
tansell9e04a8d2016-07-28 09:31:59 -07001365 def used(self):
1366 return self._used[:]
1367
1368 @property
maruel064c0a32016-04-05 11:47:15 -07001369 def initial_number_items(self):
1370 return self._initial_number_items
1371
1372 @property
1373 def initial_size(self):
1374 return self._initial_size
1375
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001376 def cached_set(self):
1377 """Returns a set of all cached digests (always a new object)."""
1378 raise NotImplementedError()
1379
maruel36a963d2016-04-08 17:15:49 -07001380 def cleanup(self):
1381 """Deletes any corrupted item from the cache and trims it if necessary."""
1382 raise NotImplementedError()
1383
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001384 def touch(self, digest, size):
1385 """Ensures item is not corrupted and updates its LRU position.
1386
1387 Arguments:
1388 digest: hash digest of item to check.
1389 size: expected size of this item.
1390
1391 Returns:
1392 True if item is in cache and not corrupted.
1393 """
1394 raise NotImplementedError()
1395
1396 def evict(self, digest):
1397 """Removes item from cache if it's there."""
1398 raise NotImplementedError()
1399
tansell9e04a8d2016-07-28 09:31:59 -07001400 def getfileobj(self, digest):
1401 """Returns a readable file like object.
1402
1403 If file exists on the file system it will have a .name attribute with an
1404 absolute path to the file.
1405 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001406 raise NotImplementedError()
1407
1408 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001409 """Reads data from |content| generator and stores it in cache.
1410
1411 Returns digest to simplify chaining.
1412 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001413 raise NotImplementedError()
1414
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001415
1416class MemoryCache(LocalCache):
1417 """LocalCache implementation that stores everything in memory."""
1418
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001419 def __init__(self, file_mode_mask=0500):
1420 """Args:
1421 file_mode_mask: bit mask to AND file mode with. Default value will make
1422 all mapped files to be read only.
1423 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001424 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001425 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001426 self._contents = {}
1427
nodirbe642ff2016-06-09 15:51:51 -07001428 def __contains__(self, digest):
1429 with self._lock:
1430 return digest in self._contents
1431
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001432 def cached_set(self):
1433 with self._lock:
1434 return set(self._contents)
1435
maruel36a963d2016-04-08 17:15:49 -07001436 def cleanup(self):
1437 pass
1438
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001439 def touch(self, digest, size):
1440 with self._lock:
1441 return digest in self._contents
1442
1443 def evict(self, digest):
1444 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001445 v = self._contents.pop(digest, None)
1446 if v is not None:
1447 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001448
tansell9e04a8d2016-07-28 09:31:59 -07001449 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001450 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001451 try:
tansell9e04a8d2016-07-28 09:31:59 -07001452 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001453 except KeyError:
1454 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001455 self._used.append(len(d))
1456 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001457
1458 def write(self, digest, content):
1459 # Assemble whole stream before taking the lock.
1460 data = ''.join(content)
1461 with self._lock:
1462 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001463 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001464 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001465
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001466
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001467class CachePolicies(object):
1468 def __init__(self, max_cache_size, min_free_space, max_items):
1469 """
1470 Arguments:
1471 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1472 cache is effectively a leak.
1473 - min_free_space: Trim if disk free space becomes lower than this value. If
1474 0, it unconditionally fill the disk.
1475 - max_items: Maximum number of items to keep in the cache. If 0, do not
1476 enforce a limit.
1477 """
1478 self.max_cache_size = max_cache_size
1479 self.min_free_space = min_free_space
1480 self.max_items = max_items
1481
1482
1483class DiskCache(LocalCache):
1484 """Stateful LRU cache in a flat hash table in a directory.
1485
1486 Saves its state as json file.
1487 """
maruel12e30012015-10-09 11:55:35 -07001488 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001489
1490 def __init__(self, cache_dir, policies, hash_algo):
1491 """
1492 Arguments:
1493 cache_dir: directory where to place the cache.
1494 policies: cache retention policies.
1495 algo: hashing algorithm used.
1496 """
maruel064c0a32016-04-05 11:47:15 -07001497 # All protected methods (starting with '_') except _path should be called
1498 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001499 super(DiskCache, self).__init__()
1500 self.cache_dir = cache_dir
1501 self.policies = policies
1502 self.hash_algo = hash_algo
1503 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001504 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001505 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001506 # Current cached free disk space. It is updated by self._trim().
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001507 self._free_disk = 0
maruel2e8d0f52016-07-16 07:51:29 -07001508 # The first item in the LRU cache that must not be evicted during this run
1509 # since it was referenced. All items more recent that _protected in the LRU
1510 # cache are also inherently protected. It could be a set() of all items
1511 # referenced but this increases memory usage without a use case.
1512 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001513 # Cleanup operations done by self._load(), if any.
1514 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001515 with tools.Profiler('Setup'):
1516 with self._lock:
maruel083fa552016-04-08 14:38:01 -07001517 # self._load() calls self._trim() which initializes self._free_disk.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001518 self._load()
1519
nodirbe642ff2016-06-09 15:51:51 -07001520 def __contains__(self, digest):
1521 with self._lock:
1522 return digest in self._lru
1523
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001524 def __enter__(self):
1525 return self
1526
1527 def __exit__(self, _exc_type, _exec_value, _traceback):
1528 with tools.Profiler('CleanupTrimming'):
1529 with self._lock:
1530 self._trim()
1531
1532 logging.info(
1533 '%5d (%8dkb) added',
1534 len(self._added), sum(self._added) / 1024)
1535 logging.info(
1536 '%5d (%8dkb) current',
1537 len(self._lru),
1538 sum(self._lru.itervalues()) / 1024)
1539 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001540 '%5d (%8dkb) evicted',
1541 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001542 logging.info(
1543 ' %8dkb free',
1544 self._free_disk / 1024)
1545 return False
1546
1547 def cached_set(self):
1548 with self._lock:
1549 return self._lru.keys_set()
1550
maruel36a963d2016-04-08 17:15:49 -07001551 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001552 """Cleans up the cache directory.
1553
1554 Ensures there is no unknown files in cache_dir.
1555 Ensures the read-only bits are set correctly.
1556
1557 At that point, the cache was already loaded, trimmed to respect cache
1558 policies.
1559 """
1560 fs.chmod(self.cache_dir, 0700)
1561 # Ensure that all files listed in the state still exist and add new ones.
1562 previous = self._lru.keys_set()
1563 # It'd be faster if there were a readdir() function.
1564 for filename in fs.listdir(self.cache_dir):
1565 if filename == self.STATE_FILE:
1566 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1567 continue
1568 if filename in previous:
1569 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1570 previous.remove(filename)
1571 continue
1572
1573 # An untracked file. Delete it.
1574 logging.warning('Removing unknown file %s from cache', filename)
1575 p = self._path(filename)
1576 if fs.isdir(p):
1577 try:
1578 file_path.rmtree(p)
1579 except OSError:
1580 pass
1581 else:
1582 file_path.try_remove(p)
1583 continue
1584
1585 if previous:
1586 # Filter out entries that were not found.
1587 logging.warning('Removed %d lost files', len(previous))
1588 for filename in previous:
1589 self._lru.pop(filename)
maruel36a963d2016-04-08 17:15:49 -07001590
1591 # What remains to be done is to hash every single item to
1592 # detect corruption, then save to ensure state.json is up to date.
1593 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1594 # TODO(maruel): Let's revisit once directory metadata is stored in
1595 # state.json so only the files that had been mapped since the last cleanup()
1596 # call are manually verified.
1597 #
1598 #with self._lock:
1599 # for digest in self._lru:
1600 # if not isolated_format.is_valid_hash(
1601 # self._path(digest), self.hash_algo):
1602 # self.evict(digest)
1603 # logging.info('Deleted corrupted item: %s', digest)
1604
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001605 def touch(self, digest, size):
1606 """Verifies an actual file is valid.
1607
1608 Note that is doesn't compute the hash so it could still be corrupted if the
1609 file size didn't change.
1610
1611 TODO(maruel): More stringent verification while keeping the check fast.
1612 """
1613 # Do the check outside the lock.
1614 if not is_valid_file(self._path(digest), size):
1615 return False
1616
1617 # Update it's LRU position.
1618 with self._lock:
1619 if digest not in self._lru:
1620 return False
1621 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001622 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001623 return True
1624
1625 def evict(self, digest):
1626 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001627 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001628 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001629 self._lru.pop(digest)
1630 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1631
tansell9e04a8d2016-07-28 09:31:59 -07001632 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001633 try:
tansell9e04a8d2016-07-28 09:31:59 -07001634 f = fs.open(self._path(digest), 'rb')
1635 with self._lock:
1636 self._used.append(self._lru[digest])
1637 return f
nodir445097b2016-06-03 22:50:26 -07001638 except IOError:
1639 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001640
1641 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001642 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001643 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001644 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001645 path = self._path(digest)
1646 # A stale broken file may remain. It is possible for the file to have write
1647 # access bit removed which would cause the file_write() call to fail to open
1648 # in write mode. Take no chance here.
1649 file_path.try_remove(path)
1650 try:
1651 size = file_write(path, content)
1652 except:
1653 # There are two possible places were an exception can occur:
1654 # 1) Inside |content| generator in case of network or unzipping errors.
1655 # 2) Inside file_write itself in case of disk IO errors.
1656 # In any case delete an incomplete file and propagate the exception to
1657 # caller, it will be logged there.
1658 file_path.try_remove(path)
1659 raise
1660 # Make the file read-only in the cache. This has a few side-effects since
1661 # the file node is modified, so every directory entries to this file becomes
1662 # read-only. It's fine here because it is a new file.
1663 file_path.set_read_only(path, True)
1664 with self._lock:
1665 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001666 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001667
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001668 def _load(self):
maruel2e8d0f52016-07-16 07:51:29 -07001669 """Loads state of the cache from json file.
1670
1671 If cache_dir does not exist on disk, it is created.
1672 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001673 self._lock.assert_locked()
1674
maruel2e8d0f52016-07-16 07:51:29 -07001675 if not fs.isfile(self.state_file):
1676 if not os.path.isdir(self.cache_dir):
1677 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001678 else:
maruel2e8d0f52016-07-16 07:51:29 -07001679 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001680 try:
1681 self._lru = lru.LRUDict.load(self.state_file)
1682 except ValueError as err:
1683 logging.error('Failed to load cache state: %s' % (err,))
1684 # Don't want to keep broken state file.
1685 file_path.try_remove(self.state_file)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001686 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001687 # We want the initial cache size after trimming, i.e. what is readily
1688 # avaiable.
1689 self._initial_number_items = len(self._lru)
1690 self._initial_size = sum(self._lru.itervalues())
1691 if self._evicted:
1692 logging.info(
1693 'Trimming evicted items with the following sizes: %s',
1694 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001695
1696 def _save(self):
1697 """Saves the LRU ordering."""
1698 self._lock.assert_locked()
1699 if sys.platform != 'win32':
1700 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001701 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001702 # Necessary otherwise the file can't be created.
1703 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001704 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001705 file_path.set_read_only(self.state_file, False)
1706 self._lru.save(self.state_file)
1707
1708 def _trim(self):
1709 """Trims anything we don't know, make sure enough free space exists."""
1710 self._lock.assert_locked()
1711
1712 # Ensure maximum cache size.
1713 if self.policies.max_cache_size:
1714 total_size = sum(self._lru.itervalues())
1715 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001716 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001717
1718 # Ensure maximum number of items in the cache.
1719 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1720 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001721 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001722
1723 # Ensure enough free space.
1724 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001725 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001726 while (
1727 self.policies.min_free_space and
1728 self._lru and
1729 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001730 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001731 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001732
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001733 if trimmed_due_to_space:
1734 total_usage = sum(self._lru.itervalues())
1735 usage_percent = 0.
1736 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001737 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1738
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001739 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001740 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1741 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1742 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001743 self._free_disk / 1024.,
1744 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001745 usage_percent,
1746 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001747 self._save()
1748
1749 def _path(self, digest):
1750 """Returns the path to one item."""
1751 return os.path.join(self.cache_dir, digest)
1752
maruel2e8d0f52016-07-16 07:51:29 -07001753 def _remove_lru_file(self, allow_protected):
1754 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001755 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001756 try:
1757 digest, size = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001758 if not allow_protected and digest == self._protected:
1759 raise Error('Not enough space to map the whole isolated tree')
maruel083fa552016-04-08 14:38:01 -07001760 except KeyError:
1761 raise Error('Nothing to remove')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001762 digest, size = self._lru.pop_oldest()
kjlubickea9abf02016-06-01 09:34:33 -07001763 logging.debug("Removing LRU file %s", digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001764 self._delete_file(digest, size)
1765 return size
1766
1767 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1768 """Adds an item into LRU cache marking it as a newest one."""
1769 self._lock.assert_locked()
1770 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001771 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001772 self._added.append(size)
1773 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001774 self._free_disk -= size
1775 # Do a quicker version of self._trim(). It only enforces free disk space,
1776 # not cache size limits. It doesn't actually look at real free disk space,
1777 # only uses its cache values. self._trim() will be called later to enforce
1778 # real trimming but doing this quick version here makes it possible to map
1779 # an isolated that is larger than the current amount of free disk space when
1780 # the cache size is already large.
1781 while (
1782 self.policies.min_free_space and
1783 self._lru and
1784 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001785 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001786
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001787 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1788 """Deletes cache file from the file system."""
1789 self._lock.assert_locked()
1790 try:
1791 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001792 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001793 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001794 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001795 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001796 except OSError as e:
1797 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1798
1799
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001800class IsolatedBundle(object):
1801 """Fetched and parsed .isolated file with all dependencies."""
1802
Vadim Shtayura3148e072014-09-02 18:51:52 -07001803 def __init__(self):
1804 self.command = []
1805 self.files = {}
1806 self.read_only = None
1807 self.relative_cwd = None
1808 # The main .isolated file, a IsolatedFile instance.
1809 self.root = None
1810
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001811 def fetch(self, fetch_queue, root_isolated_hash, algo):
1812 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001813
1814 It enables support for "included" .isolated files. They are processed in
1815 strict order but fetched asynchronously from the cache. This is important so
1816 that a file in an included .isolated file that is overridden by an embedding
1817 .isolated file is not fetched needlessly. The includes are fetched in one
1818 pass and the files are fetched as soon as all the ones on the left-side
1819 of the tree were fetched.
1820
1821 The prioritization is very important here for nested .isolated files.
1822 'includes' have the highest priority and the algorithm is optimized for both
1823 deep and wide trees. A deep one is a long link of .isolated files referenced
1824 one at a time by one item in 'includes'. A wide one has a large number of
1825 'includes' in a single .isolated file. 'left' is defined as an included
1826 .isolated file earlier in the 'includes' list. So the order of the elements
1827 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001828
1829 As a side effect this method starts asynchronous fetch of all data files
1830 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1831 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001832 """
1833 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1834
1835 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1836 pending = {}
1837 # Set of hashes of already retrieved items to refuse recursive includes.
1838 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001839 # Set of IsolatedFile's whose data files have already being fetched.
1840 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001841
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001842 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001843 h = isolated_file.obj_hash
1844 if h in seen:
1845 raise isolated_format.IsolatedError(
1846 'IsolatedFile %s is retrieved recursively' % h)
1847 assert h not in pending
1848 seen.add(h)
1849 pending[h] = isolated_file
1850 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1851
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001852 # Start fetching root *.isolated file (single file, not the whole bundle).
1853 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001854
1855 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001856 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001857 item_hash = fetch_queue.wait(pending)
1858 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001859 with fetch_queue.cache.getfileobj(item_hash) as f:
1860 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001861
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001862 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001863 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001864 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001865
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001866 # Always fetch *.isolated files in traversal order, waiting if necessary
1867 # until next to-be-processed node loads. "Waiting" is done by yielding
1868 # back to the outer loop, that waits until some *.isolated is loaded.
1869 for node in isolated_format.walk_includes(self.root):
1870 if node not in processed:
1871 # Not visited, and not yet loaded -> wait for it to load.
1872 if not node.is_loaded:
1873 break
1874 # Not visited and loaded -> process it and continue the traversal.
1875 self._start_fetching_files(node, fetch_queue)
1876 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001877
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001878 # All *.isolated files should be processed by now and only them.
1879 all_isolateds = set(isolated_format.walk_includes(self.root))
1880 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001881
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001882 # Extract 'command' and other bundle properties.
1883 for node in isolated_format.walk_includes(self.root):
1884 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001885 self.relative_cwd = self.relative_cwd or ''
1886
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001887 def _start_fetching_files(self, isolated, fetch_queue):
1888 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001889
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001890 Modifies self.files.
1891 """
1892 logging.debug('fetch_files(%s)', isolated.obj_hash)
1893 for filepath, properties in isolated.data.get('files', {}).iteritems():
1894 # Root isolated has priority on the files being mapped. In particular,
1895 # overridden files must not be fetched.
1896 if filepath not in self.files:
1897 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001898
1899 # Make sure if the isolated is read only, the mode doesn't have write
1900 # bits.
1901 if 'm' in properties and self.read_only:
1902 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1903
1904 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001905 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001906 logging.debug('fetching %s', filepath)
1907 fetch_queue.add(
1908 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1909
1910 def _update_self(self, node):
1911 """Extracts bundle global parameters from loaded *.isolated file.
1912
1913 Will be called with each loaded *.isolated file in order of traversal of
1914 isolated include graph (see isolated_format.walk_includes).
1915 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001916 # Grabs properties.
1917 if not self.command and node.data.get('command'):
1918 # Ensure paths are correctly separated on windows.
1919 self.command = node.data['command']
1920 if self.command:
1921 self.command[0] = self.command[0].replace('/', os.path.sep)
1922 self.command = tools.fix_python_path(self.command)
1923 if self.read_only is None and node.data.get('read_only') is not None:
1924 self.read_only = node.data['read_only']
1925 if (self.relative_cwd is None and
1926 node.data.get('relative_cwd') is not None):
1927 self.relative_cwd = node.data['relative_cwd']
1928
1929
Vadim Shtayura8623c272014-12-01 11:45:27 -08001930def set_storage_api_class(cls):
1931 """Replaces StorageApi implementation used by default."""
1932 global _storage_api_cls
1933 assert _storage_api_cls is None
1934 assert issubclass(cls, StorageApi)
1935 _storage_api_cls = cls
1936
1937
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001938def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001939 """Returns an object that implements low-level StorageApi interface.
1940
1941 It is used by Storage to work with single isolate |namespace|. It should
1942 rarely be used directly by clients, see 'get_storage' for
1943 a better alternative.
1944
1945 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001946 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001947 namespace: isolate namespace to operate in, also defines hashing and
1948 compression scheme used, i.e. namespace names that end with '-gzip'
1949 store compressed data.
1950
1951 Returns:
1952 Instance of StorageApi subclass.
1953 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001954 cls = _storage_api_cls or IsolateServer
1955 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001956
1957
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001958def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001959 """Returns Storage class that can upload and download from |namespace|.
1960
1961 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001962 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001963 namespace: isolate namespace to operate in, also defines hashing and
1964 compression scheme used, i.e. namespace names that end with '-gzip'
1965 store compressed data.
1966
1967 Returns:
1968 Instance of Storage.
1969 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001970 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001971
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001972
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001973def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001974 """Uploads the given tree to the given url.
1975
1976 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001977 base_url: The url of the isolate server to upload to.
1978 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001979 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001980 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001981 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001982 # Filter out symlinks, since they are not represented by items on isolate
1983 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001984 items = []
1985 seen = set()
1986 skipped = 0
1987 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001988 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001989 if 'l' not in metadata and filepath not in seen:
1990 seen.add(filepath)
1991 item = FileItem(
1992 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001993 digest=metadata['h'],
1994 size=metadata['s'],
1995 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001996 items.append(item)
1997 else:
1998 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001999
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002000 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002001 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002002 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07002003
2004
maruel4409e302016-07-19 14:25:51 -07002005def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002006 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002007
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002008 Arguments:
2009 isolated_hash: hash of the root *.isolated file.
2010 storage: Storage class that communicates with isolate storage.
2011 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002012 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07002013 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002014
2015 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002016 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002017 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002018 logging.debug(
maruel4409e302016-07-19 14:25:51 -07002019 'fetch_isolated(%s, %s, %s, %s, %s)',
2020 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002021 # Hash algorithm to use, defined by namespace |storage| is using.
2022 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002023 with cache:
2024 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002025 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002026
2027 with tools.Profiler('GetIsolateds'):
2028 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002029 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002030 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07002031 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002032 try:
maruel1ceb3872015-10-14 06:10:44 -07002033 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002034 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002035 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002036 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
2037 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002038
2039 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002040 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002041
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002042 with tools.Profiler('GetRest'):
2043 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07002044 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002045 create_directories(outdir, bundle.files)
2046 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002047
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002048 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002049 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07002050 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002051
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002052 # Multimap: digest -> list of pairs (path, props).
2053 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002054 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002055 if 'h' in props:
2056 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002057
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002058 # Now block on the remaining files to be downloaded and mapped.
2059 logging.info('Retrieving remaining files (%d of them)...',
2060 fetch_queue.pending_count)
2061 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07002062 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002063 while remaining:
2064 detector.ping()
2065
2066 # Wait for any item to finish fetching to cache.
2067 digest = fetch_queue.wait(remaining)
2068
tansell9e04a8d2016-07-28 09:31:59 -07002069 # Create the files in the destination using item in cache as the
2070 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002071 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07002072 fullpath = os.path.join(outdir, filepath)
2073
2074 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07002075 filetype = props.get('t', 'basic')
2076
2077 if filetype == 'basic':
2078 file_mode = props.get('m')
2079 if file_mode:
2080 # Ignore all bits apart from the user
2081 file_mode &= 0700
2082 putfile(
2083 srcfileobj, fullpath, file_mode,
2084 use_symlink=use_symlinks)
2085
2086 elif filetype == 'ar':
2087 basedir = os.path.dirname(fullpath)
2088 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
2089 for ai, ifd in extractor:
2090 fp = os.path.normpath(os.path.join(basedir, ai.name))
2091 file_path.ensure_tree(os.path.dirname(fp))
2092 putfile(ifd, fp, 0700, ai.size)
2093
2094 else:
2095 raise isolated_format.IsolatedError(
2096 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002097
2098 # Report progress.
2099 duration = time.time() - last_update
2100 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
2101 msg = '%d files remaining...' % len(remaining)
2102 print msg
2103 logging.info(msg)
2104 last_update = time.time()
2105
2106 # Cache could evict some items we just tried to fetch, it's a fatal error.
2107 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002108 raise isolated_format.MappingError(
2109 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002110 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002111
2112
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002113def directory_to_metadata(root, algo, blacklist):
2114 """Returns the FileItem list and .isolated metadata for a directory."""
2115 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002116 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07002117 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002118 metadata = {
2119 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05002120 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002121 for relpath in paths
2122 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002123 for v in metadata.itervalues():
2124 v.pop('t')
2125 items = [
2126 FileItem(
2127 path=os.path.join(root, relpath),
2128 digest=meta['h'],
2129 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002130 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002131 for relpath, meta in metadata.iteritems() if 'h' in meta
2132 ]
2133 return items, metadata
2134
2135
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002136def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002137 """Stores every entries and returns the relevant data.
2138
2139 Arguments:
2140 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002141 files: list of file paths to upload. If a directory is specified, a
2142 .isolated file is created and its hash is returned.
2143 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07002144
2145 Returns:
2146 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
2147 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002148 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002149 assert all(isinstance(i, unicode) for i in files), files
2150 if len(files) != len(set(map(os.path.abspath, files))):
2151 raise Error('Duplicate entries found.')
2152
maruel064c0a32016-04-05 11:47:15 -07002153 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002154 results = []
2155 # The temporary directory is only created as needed.
2156 tempdir = None
2157 try:
2158 # TODO(maruel): Yield the files to a worker thread.
2159 items_to_upload = []
2160 for f in files:
2161 try:
2162 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07002163 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002164 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002165 items, metadata = directory_to_metadata(
2166 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002167
2168 # Create the .isolated file.
2169 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002170 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
2171 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002172 os.close(handle)
2173 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002174 'algo':
2175 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002176 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002177 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002178 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002179 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002180 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002181 items_to_upload.extend(items)
2182 items_to_upload.append(
2183 FileItem(
2184 path=isolated,
2185 digest=h,
maruel12e30012015-10-09 11:55:35 -07002186 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002187 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002188 results.append((h, f))
2189
maruel12e30012015-10-09 11:55:35 -07002190 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002191 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002192 items_to_upload.append(
2193 FileItem(
2194 path=filepath,
2195 digest=h,
maruel12e30012015-10-09 11:55:35 -07002196 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002197 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002198 results.append((h, f))
2199 else:
2200 raise Error('%s is neither a file or directory.' % f)
2201 except OSError:
2202 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07002203 uploaded = storage.upload_items(items_to_upload)
2204 cold = [i for i in items_to_upload if i in uploaded]
2205 hot = [i for i in items_to_upload if i not in uploaded]
2206 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002207 finally:
maruel12e30012015-10-09 11:55:35 -07002208 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002209 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002210
2211
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002212def archive(out, namespace, files, blacklist):
2213 if files == ['-']:
2214 files = sys.stdin.readlines()
2215
2216 if not files:
2217 raise Error('Nothing to upload')
2218
2219 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002220 blacklist = tools.gen_blacklist(blacklist)
2221 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002222 # Ignore stats.
2223 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002224 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2225
2226
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002227@subcommand.usage('<file1..fileN> or - to read from stdin')
2228def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002229 """Archives data to the server.
2230
2231 If a directory is specified, a .isolated file is created the whole directory
2232 is uploaded. Then this .isolated file can be included in another one to run
2233 commands.
2234
2235 The commands output each file that was processed with its content hash. For
2236 directories, the .isolated generated for the directory is listed as the
2237 directory entry itself.
2238 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002239 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002240 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002241 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07002242 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002243 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002244 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002245 except Error as e:
2246 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002247 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002248
2249
2250def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002251 """Download data from the server.
2252
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002253 It can either download individual files or a complete tree from a .isolated
2254 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002255 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002256 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002257 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002258 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002259 help='hash of an isolated file, .isolated file content is discarded, use '
2260 '--file if you need it')
2261 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002262 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2263 help='hash and destination of a file, can be used multiple times')
2264 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002265 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002266 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07002267 parser.add_option(
2268 '--use-symlinks', action='store_true',
2269 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002270 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002271 options, args = parser.parse_args(args)
2272 if args:
2273 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002274
nodir55be77b2016-05-03 09:39:57 -07002275 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002276 if bool(options.isolated) == bool(options.file):
2277 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07002278 if not options.cache and options.use_symlinks:
2279 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002280
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002281 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07002282 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07002283 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002284 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002285 if (fs.isfile(options.target) or
2286 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002287 parser.error(
2288 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002289 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002290 # Fetching individual files.
2291 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002292 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002293 channel = threading_utils.TaskChannel()
2294 pending = {}
2295 for digest, dest in options.file:
2296 pending[digest] = dest
2297 storage.async_fetch(
2298 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002299 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002300 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002301 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002302 functools.partial(file_write, os.path.join(options.target, dest)))
2303 while pending:
2304 fetched = channel.pull()
2305 dest = pending.pop(fetched)
2306 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002307
Vadim Shtayura3172be52013-12-03 12:49:05 -08002308 # Fetching whole isolated tree.
2309 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002310 with cache:
2311 bundle = fetch_isolated(
2312 isolated_hash=options.isolated,
2313 storage=storage,
2314 cache=cache,
maruel4409e302016-07-19 14:25:51 -07002315 outdir=options.target,
2316 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002317 if bundle.command:
2318 rel = os.path.join(options.target, bundle.relative_cwd)
2319 print('To run this test please run from the directory %s:' %
2320 os.path.join(options.target, rel))
2321 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002322
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002323 return 0
2324
2325
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002326def add_archive_options(parser):
2327 parser.add_option(
2328 '--blacklist',
2329 action='append', default=list(DEFAULT_BLACKLIST),
2330 help='List of regexp to use as blacklist filter when uploading '
2331 'directories')
2332
2333
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002334def add_isolate_server_options(parser):
2335 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002336 parser.add_option(
2337 '-I', '--isolate-server',
2338 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002339 help='URL of the Isolate Server to use. Defaults to the environment '
2340 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2341 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002342 parser.add_option(
2343 '--namespace', default='default-gzip',
2344 help='The namespace to use on the Isolate Server, default: %default')
2345
2346
nodir55be77b2016-05-03 09:39:57 -07002347def process_isolate_server_options(
2348 parser, options, set_exception_handler, required):
2349 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002350
2351 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002352 """
2353 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002354 if required:
2355 parser.error('--isolate-server is required.')
2356 return
2357
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002358 try:
2359 options.isolate_server = net.fix_url(options.isolate_server)
2360 except ValueError as e:
2361 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002362 if set_exception_handler:
2363 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002364 try:
2365 return auth.ensure_logged_in(options.isolate_server)
2366 except ValueError as e:
2367 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002368
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002369
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002370def add_cache_options(parser):
2371 cache_group = optparse.OptionGroup(parser, 'Cache management')
2372 cache_group.add_option(
2373 '--cache', metavar='DIR',
2374 help='Directory to keep a local cache of the files. Accelerates download '
2375 'by reusing already downloaded files. Default=%default')
2376 cache_group.add_option(
2377 '--max-cache-size',
2378 type='int',
2379 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002380 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002381 help='Trim if the cache gets larger than this value, default=%default')
2382 cache_group.add_option(
2383 '--min-free-space',
2384 type='int',
2385 metavar='NNN',
2386 default=2*1024*1024*1024,
2387 help='Trim if disk free space becomes lower than this value, '
2388 'default=%default')
2389 cache_group.add_option(
2390 '--max-items',
2391 type='int',
2392 metavar='NNN',
2393 default=100000,
2394 help='Trim if more than this number of items are in the cache '
2395 'default=%default')
2396 parser.add_option_group(cache_group)
2397
2398
2399def process_cache_options(options):
2400 if options.cache:
2401 policies = CachePolicies(
2402 options.max_cache_size, options.min_free_space, options.max_items)
2403
2404 # |options.cache| path may not exist until DiskCache() instance is created.
2405 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002406 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002407 policies,
2408 isolated_format.get_hash_algo(options.namespace))
2409 else:
2410 return MemoryCache()
2411
2412
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002413class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002414 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002415 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002416 self,
2417 version=__version__,
2418 prog=os.path.basename(sys.modules[__name__].__file__),
2419 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002420 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002421
2422 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002423 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002424 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002425 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002426 return options, args
2427
2428
2429def main(args):
2430 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002431 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002432
2433
2434if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002435 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002436 fix_encoding.fix_encoding()
2437 tools.disable_buffering()
2438 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002439 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002440 sys.exit(main(sys.argv[1:]))