blob: 4e8fdfec121f3c018d2192134ad88e81d4259e0a [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
tansell9e04a8d2016-07-28 09:31:59 -07008__version__ = '0.6.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
13import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040015import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000022import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050024import types
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000025import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000026
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027from third_party import colorama
28from third_party.depot_tools import fix_encoding
29from third_party.depot_tools import subcommand
30
tanselle4288c32016-07-28 09:45:40 -070031from libs import arfile
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050032from utils import file_path
maruel12e30012015-10-09 11:55:35 -070033from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040034from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040035from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000036from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040037from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070038from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000039from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000040from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080042import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040043import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# The file size to be used when we don't know the correct file size,
51# generally used for .isolated files.
52UNKNOWN_FILE_SIZE = None
53
54
55# Maximum expected delay (in seconds) between successive file fetches or uploads
56# in Storage. If it takes longer than that, a deadlock might be happening
57# and all stack frames for all threads are dumped to log.
58DEADLOCK_TIMEOUT = 5 * 60
59
60
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000061# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000062# All files are sorted by likelihood of a change in the file content
63# (currently file size is used to estimate this: larger the file -> larger the
64# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000065# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000066# and so on. Numbers here is a trade-off; the more per request, the lower the
67# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
68# larger values cause longer lookups, increasing the initial latency to start
69# uploading, which is especially an issue for large files. This value is
70# optimized for the "few thousands files to look up with minimal number of large
71# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040072ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000073
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000074
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075# A list of already compressed extension types that should not receive any
76# compression before being uploaded.
77ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040078 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
79 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000080]
81
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000082
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000083# Chunk size to use when reading from network stream.
84NET_IO_FILE_CHUNK = 16 * 1024
85
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000086
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000087# Read timeout in seconds for downloads from isolate storage. If there's no
88# response from the server within this timeout whole download will be aborted.
89DOWNLOAD_READ_TIMEOUT = 60
90
91
maruel@chromium.org41601642013-09-18 19:40:46 +000092# The delay (in seconds) to wait between logging statements when retrieving
93# the required files. This is intended to let the user (or buildbot) know that
94# the program is still running.
95DELAY_BETWEEN_UPDATES_IN_SECS = 30
96
97
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050098DEFAULT_BLACKLIST = (
99 # Temporary vim or python files.
100 r'^.+\.(?:pyc|swp)$',
101 # .git or .svn directory.
102 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
103)
104
105
Vadim Shtayura8623c272014-12-01 11:45:27 -0800106# A class to use to communicate with the server by default. Can be changed by
107# 'set_storage_api_class'. Default is IsolateServer.
108_storage_api_cls = None
109
110
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500111class Error(Exception):
112 """Generic runtime error."""
113 pass
114
115
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400116class Aborted(Error):
117 """Operation aborted."""
118 pass
119
120
nodir90bc8dc2016-06-15 13:35:21 -0700121class AlreadyExists(Error):
122 """File already exists."""
123
124
maruel12e30012015-10-09 11:55:35 -0700125def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800126 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700127 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800128 if offset:
129 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000130 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000131 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 if not data:
133 break
134 yield data
135
136
maruel12e30012015-10-09 11:55:35 -0700137def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138 """Writes file content as generated by content_generator.
139
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000140 Creates the intermediary directory as needed.
141
142 Returns the number of bytes written.
143
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000144 Meant to be mocked out in unit tests.
145 """
nodire5028a92016-04-29 14:38:21 -0700146 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 total = 0
maruel12e30012015-10-09 11:55:35 -0700148 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000149 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000150 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000152 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153
154
tansell9e04a8d2016-07-28 09:31:59 -0700155def fileobj_path(fileobj):
156 """Return file system path for file like object or None.
157
158 The returned path is guaranteed to exist and can be passed to file system
159 operations like copy.
160 """
161 name = getattr(fileobj, 'name', None)
162 if name is None:
163 return
164
165 # If the file like object was created using something like open("test.txt")
166 # name will end up being a str (such as a function outside our control, like
167 # the standard library). We want all our paths to be unicode objects, so we
168 # decode it.
169 if not isinstance(name, unicode):
170 name = name.decode(sys.getfilesystemencoding())
171
172 if fs.exists(name):
173 return name
174
175
176# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
177# wrappers have been created.
178def fileobj_copy(
179 dstfileobj, srcfileobj, size=-1,
180 chunk_size=isolated_format.DISK_FILE_CHUNK):
181 """Copy data from srcfileobj to dstfileobj.
182
183 Providing size means exactly that amount of data will be copied (if there
184 isn't enough data, an IOError exception is thrown). Otherwise all data until
185 the EOF marker will be copied.
186 """
187 if size == -1 and hasattr(srcfileobj, 'tell'):
188 if srcfileobj.tell() != 0:
189 raise IOError('partial file but not using size')
190
191 written = 0
192 while written != size:
193 readsize = chunk_size
194 if size > 0:
195 readsize = min(readsize, size-written)
196 data = srcfileobj.read(readsize)
197 if not data:
198 if size == -1:
199 break
200 raise IOError('partial file, got %s, wanted %s' % (written, size))
201 dstfileobj.write(data)
202 written += len(data)
203
204
205def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
206 """Put srcfileobj at the given dstpath with given mode.
207
208 The function aims to do this as efficiently as possible while still allowing
209 any possible file like object be given.
210
211 Creating a tree of hardlinks has a few drawbacks:
212 - tmpfs cannot be used for the scratch space. The tree has to be on the same
213 partition as the cache.
214 - involves a write to the inode, which advances ctime, cause a metadata
215 writeback (causing disk seeking).
216 - cache ctime cannot be used to detect modifications / corruption.
217 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
218 partition. This is why the function automatically fallbacks to copying the
219 file content.
220 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
221 same owner is for all hardlinks.
222 - Anecdotal report that ext2 is known to be potentially faulty on high rate
223 of hardlink creation.
224
225 Creating a tree of symlinks has a few drawbacks:
226 - Tasks running the equivalent of os.path.realpath() will get the naked path
227 and may fail.
228 - Windows:
229 - Symlinks are reparse points:
230 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
231 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
232 - Symbolic links are Win32 paths, not NT paths.
233 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
234 - Symbolic links are supported on Windows 7 and later only.
235 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
236 default.
237 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
238 RID is present in the token;
239 https://msdn.microsoft.com/en-us/library/bb530410.aspx
240 """
241 srcpath = fileobj_path(srcfileobj)
242 if srcpath and size == -1:
243 readonly = file_mode is None or (
244 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
245
246 if readonly:
247 # If the file is read only we can link the file
248 if use_symlink:
249 link_mode = file_path.SYMLINK_WITH_FALLBACK
250 else:
251 link_mode = file_path.HARDLINK_WITH_FALLBACK
252 else:
253 # If not read only, we must copy the file
254 link_mode = file_path.COPY
255
256 file_path.link_file(dstpath, srcpath, link_mode)
257 else:
258 # Need to write out the file
259 with fs.open(dstpath, 'wb') as dstfileobj:
260 fileobj_copy(dstfileobj, srcfileobj, size)
261
262 assert fs.exists(dstpath)
263
264 # file_mode of 0 is actually valid, so need explicit check.
265 if file_mode is not None:
266 fs.chmod(dstpath, file_mode)
267
268
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000269def zip_compress(content_generator, level=7):
270 """Reads chunks from |content_generator| and yields zip compressed chunks."""
271 compressor = zlib.compressobj(level)
272 for chunk in content_generator:
273 compressed = compressor.compress(chunk)
274 if compressed:
275 yield compressed
276 tail = compressor.flush(zlib.Z_FINISH)
277 if tail:
278 yield tail
279
280
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400281def zip_decompress(
282 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000283 """Reads zipped data from |content_generator| and yields decompressed data.
284
285 Decompresses data in small chunks (no larger than |chunk_size|) so that
286 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
287
288 Raises IOError if data is corrupted or incomplete.
289 """
290 decompressor = zlib.decompressobj()
291 compressed_size = 0
292 try:
293 for chunk in content_generator:
294 compressed_size += len(chunk)
295 data = decompressor.decompress(chunk, chunk_size)
296 if data:
297 yield data
298 while decompressor.unconsumed_tail:
299 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
300 if data:
301 yield data
302 tail = decompressor.flush()
303 if tail:
304 yield tail
305 except zlib.error as e:
306 raise IOError(
307 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
308 # Ensure all data was read and decompressed.
309 if decompressor.unused_data or decompressor.unconsumed_tail:
310 raise IOError('Not all data was decompressed')
311
312
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000313def get_zip_compression_level(filename):
314 """Given a filename calculates the ideal zip compression level to use."""
315 file_ext = os.path.splitext(filename)[1].lower()
316 # TODO(csharp): Profile to find what compression level works best.
317 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
318
319
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000320def create_directories(base_directory, files):
321 """Creates the directory structure needed by the given list of files."""
322 logging.debug('create_directories(%s, %d)', base_directory, len(files))
323 # Creates the tree of directories to create.
324 directories = set(os.path.dirname(f) for f in files)
325 for item in list(directories):
326 while item:
327 directories.add(item)
328 item = os.path.dirname(item)
329 for d in sorted(directories):
330 if d:
maruel12e30012015-10-09 11:55:35 -0700331 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000332
333
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500334def create_symlinks(base_directory, files):
335 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000336 for filepath, properties in files:
337 if 'l' not in properties:
338 continue
339 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500340 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000341 logging.warning('Ignoring symlink %s', filepath)
342 continue
343 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700344 try:
345 os.symlink(properties['l'], outfile) # pylint: disable=E1101
346 except OSError as e:
347 if e.errno == errno.EEXIST:
348 raise AlreadyExists('File %s already exists.' % outfile)
349 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000350
351
maruel12e30012015-10-09 11:55:35 -0700352def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000353 """Determines if the given files appears valid.
354
355 Currently it just checks the file's size.
356 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700357 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700358 return fs.isfile(path)
359 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000360 if size != actual_size:
361 logging.warning(
362 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700363 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000364 return False
365 return True
366
367
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000368class Item(object):
369 """An item to push to Storage.
370
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800371 Its digest and size may be provided in advance, if known. Otherwise they will
372 be derived from content(). If digest is provided, it MUST correspond to
373 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000374
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800375 When used with Storage, Item starts its life in a main thread, travels
376 to 'contains' thread, then to 'push' thread and then finally back to
377 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000378 """
379
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800380 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000381 self.digest = digest
382 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800383 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000384 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000385
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800386 def content(self):
387 """Iterable with content of this item as byte string (str) chunks."""
388 raise NotImplementedError()
389
390 def prepare(self, hash_algo):
391 """Ensures self.digest and self.size are set.
392
393 Uses content() as a source of data to calculate them. Does nothing if digest
394 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000395
396 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800397 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000398 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800399 if self.digest is None or self.size is None:
400 digest = hash_algo()
401 total = 0
402 for chunk in self.content():
403 digest.update(chunk)
404 total += len(chunk)
405 self.digest = digest.hexdigest()
406 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000407
408
409class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800410 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000411
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800412 Its digest and size may be provided in advance, if known. Otherwise they will
413 be derived from the file content.
414 """
415
416 def __init__(self, path, digest=None, size=None, high_priority=False):
417 super(FileItem, self).__init__(
418 digest,
maruel12e30012015-10-09 11:55:35 -0700419 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800420 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000421 self.path = path
422 self.compression_level = get_zip_compression_level(path)
423
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800424 def content(self):
425 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000426
427
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000428class BufferItem(Item):
429 """A byte buffer to push to Storage."""
430
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800431 def __init__(self, buf, high_priority=False):
432 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000433 self.buffer = buf
434
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800435 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000436 return [self.buffer]
437
438
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000439class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800440 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000441
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800442 Implements compression support, parallel 'contains' checks, parallel uploads
443 and more.
444
445 Works only within single namespace (and thus hashing algorithm and compression
446 scheme are fixed).
447
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400448 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
449 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800450 """
451
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700452 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000453 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400454 self._use_zip = isolated_format.is_namespace_with_compression(
455 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400456 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 self._cpu_thread_pool = None
458 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400459 self._aborted = False
460 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000461
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700463 def hash_algo(self):
464 """Hashing algorithm used to name files in storage based on their content.
465
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400466 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700467 """
468 return self._hash_algo
469
470 @property
471 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500472 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700473 return self._storage_api.location
474
475 @property
476 def namespace(self):
477 """Isolate namespace used by this storage.
478
479 Indirectly defines hashing scheme and compression method used.
480 """
481 return self._storage_api.namespace
482
483 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000484 def cpu_thread_pool(self):
485 """ThreadPool for CPU-bound tasks like zipping."""
486 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500487 threads = max(threading_utils.num_processors(), 2)
488 if sys.maxsize <= 2L**32:
489 # On 32 bits userland, do not try to use more than 16 threads.
490 threads = min(threads, 16)
491 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000492 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000493
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000494 @property
495 def net_thread_pool(self):
496 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
497 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700498 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000499 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000500
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000501 def close(self):
502 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400503 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000504 if self._cpu_thread_pool:
505 self._cpu_thread_pool.join()
506 self._cpu_thread_pool.close()
507 self._cpu_thread_pool = None
508 if self._net_thread_pool:
509 self._net_thread_pool.join()
510 self._net_thread_pool.close()
511 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400512 logging.info('Done.')
513
514 def abort(self):
515 """Cancels any pending or future operations."""
516 # This is not strictly theadsafe, but in the worst case the logging message
517 # will be printed twice. Not a big deal. In other places it is assumed that
518 # unprotected reads and writes to _aborted are serializable (it is true
519 # for python) and thus no locking is used.
520 if not self._aborted:
521 logging.warning('Aborting... It can take a while.')
522 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000523
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000524 def __enter__(self):
525 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400526 assert not self._prev_sig_handlers, self._prev_sig_handlers
527 for s in (signal.SIGINT, signal.SIGTERM):
528 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000529 return self
530
531 def __exit__(self, _exc_type, _exc_value, _traceback):
532 """Context manager interface."""
533 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400534 while self._prev_sig_handlers:
535 s, h = self._prev_sig_handlers.popitem()
536 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 return False
538
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000539 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800540 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800542 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543
544 Arguments:
545 items: list of Item instances that represents data to upload.
546
547 Returns:
548 List of items that were uploaded. All other items are already there.
549 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700550 logging.info('upload_items(items=%d)', len(items))
551
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800552 # Ensure all digests are calculated.
553 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700554 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800555
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000556 # For each digest keep only first Item that matches it. All other items
557 # are just indistinguishable copies from the point of view of isolate
558 # server (it doesn't care about paths at all, only content and digests).
559 seen = {}
560 duplicates = 0
561 for item in items:
562 if seen.setdefault(item.digest, item) is not item:
563 duplicates += 1
564 items = seen.values()
565 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700566 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000567
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000568 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000569 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800571 channel = threading_utils.TaskChannel()
572 for missing_item, push_state in self.get_missing_items(items):
573 missing.add(missing_item)
574 self.async_push(channel, missing_item, push_state)
575
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000576 # No need to spawn deadlock detector thread if there's nothing to upload.
577 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700578 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000579 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000580 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000581 detector.ping()
582 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000583 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000584 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000585 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000586 logging.info('All files are uploaded')
587
588 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000589 total = len(items)
590 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000591 logging.info(
592 'Total: %6d, %9.1fkb',
593 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000594 total_size / 1024.)
595 cache_hit = set(items) - missing
596 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000597 logging.info(
598 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
599 len(cache_hit),
600 cache_hit_size / 1024.,
601 len(cache_hit) * 100. / total,
602 cache_hit_size * 100. / total_size if total_size else 0)
603 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000604 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 logging.info(
606 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
607 len(cache_miss),
608 cache_miss_size / 1024.,
609 len(cache_miss) * 100. / total,
610 cache_miss_size * 100. / total_size if total_size else 0)
611
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000612 return uploaded
613
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000615 """Starts asynchronous push to the server in a parallel thread.
616
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800617 Can be used only after |item| was checked for presence on a server with
618 'get_missing_items' call. 'get_missing_items' returns |push_state| object
619 that contains storage specific information describing how to upload
620 the item (for example in case of cloud storage, it is signed upload URLs).
621
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000622 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000623 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000624 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800625 push_state: push state returned by 'get_missing_items' call for |item|.
626
627 Returns:
628 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000629 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800630 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400631 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700632 threading_utils.PRIORITY_HIGH if item.high_priority
633 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800634
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000635 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400636 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400637 if self._aborted:
638 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700639 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800640 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000641 return item
642
643 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700644 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800645 self.net_thread_pool.add_task_with_channel(
646 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000647 return
648
649 # If zipping is enabled, zip in a separate thread.
650 def zip_and_push():
651 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
652 # content right here. It will block until all file is zipped.
653 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400654 if self._aborted:
655 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800656 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000657 data = ''.join(stream)
658 except Exception as exc:
659 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800660 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000661 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000662 self.net_thread_pool.add_task_with_channel(
663 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000664 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000665
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800666 def push(self, item, push_state):
667 """Synchronously pushes a single item to the server.
668
669 If you need to push many items at once, consider using 'upload_items' or
670 'async_push' with instance of TaskChannel.
671
672 Arguments:
673 item: item to upload as instance of Item class.
674 push_state: push state returned by 'get_missing_items' call for |item|.
675
676 Returns:
677 Pushed item (same object as |item|).
678 """
679 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700680 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800681 self.async_push(channel, item, push_state)
682 pushed = channel.pull()
683 assert pushed is item
684 return item
685
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000686 def async_fetch(self, channel, priority, digest, size, sink):
687 """Starts asynchronous fetch from the server in a parallel thread.
688
689 Arguments:
690 channel: TaskChannel that receives back |digest| when download ends.
691 priority: thread pool task priority for the fetch.
692 digest: hex digest of an item to download.
693 size: expected size of the item (after decompression).
694 sink: function that will be called as sink(generator).
695 """
696 def fetch():
697 try:
698 # Prepare reading pipeline.
699 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700700 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400701 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000702 # Run |stream| through verifier that will assert its size.
703 verifier = FetchStreamVerifier(stream, size)
704 # Verified stream goes to |sink|.
705 sink(verifier.run())
706 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800707 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000708 raise
709 return digest
710
711 # Don't bother with zip_thread_pool for decompression. Decompression is
712 # really fast and most probably IO bound anyway.
713 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
714
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000715 def get_missing_items(self, items):
716 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000717
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000718 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000719
720 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000721 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000722
723 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800724 For each missing item it yields a pair (item, push_state), where:
725 * item - Item object that is missing (one of |items|).
726 * push_state - opaque object that contains storage specific information
727 describing how to upload the item (for example in case of cloud
728 storage, it is signed upload URLs). It can later be passed to
729 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000730 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000731 channel = threading_utils.TaskChannel()
732 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800733
734 # Ensure all digests are calculated.
735 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700736 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800737
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400738 def contains(batch):
739 if self._aborted:
740 raise Aborted()
741 return self._storage_api.contains(batch)
742
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000743 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800744 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400745 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400746 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000747 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800748
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000749 # Yield results as they come in.
750 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800751 for missing_item, push_state in channel.pull().iteritems():
752 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000753
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000754
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800755def batch_items_for_check(items):
756 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000757
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800758 Each batch corresponds to a single 'exists?' query to the server via a call
759 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000760
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800761 Arguments:
762 items: a list of Item objects.
763
764 Yields:
765 Batches of items to query for existence in a single operation,
766 each batch is a list of Item objects.
767 """
768 batch_count = 0
769 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
770 next_queries = []
771 for item in sorted(items, key=lambda x: x.size, reverse=True):
772 next_queries.append(item)
773 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000774 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800775 next_queries = []
776 batch_count += 1
777 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
778 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
779 if next_queries:
780 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000781
782
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000783class FetchQueue(object):
784 """Fetches items from Storage and places them into LocalCache.
785
786 It manages multiple concurrent fetch operations. Acts as a bridge between
787 Storage and LocalCache so that Storage and LocalCache don't depend on each
788 other at all.
789 """
790
791 def __init__(self, storage, cache):
792 self.storage = storage
793 self.cache = cache
794 self._channel = threading_utils.TaskChannel()
795 self._pending = set()
796 self._accessed = set()
797 self._fetched = cache.cached_set()
798
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400799 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700800 self,
801 digest,
802 size=UNKNOWN_FILE_SIZE,
803 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000804 """Starts asynchronous fetch of item |digest|."""
805 # Fetching it now?
806 if digest in self._pending:
807 return
808
809 # Mark this file as in use, verify_all_cached will later ensure it is still
810 # in cache.
811 self._accessed.add(digest)
812
813 # Already fetched? Notify cache to update item's LRU position.
814 if digest in self._fetched:
815 # 'touch' returns True if item is in cache and not corrupted.
816 if self.cache.touch(digest, size):
817 return
818 # Item is corrupted, remove it from cache and fetch it again.
819 self._fetched.remove(digest)
820 self.cache.evict(digest)
821
822 # TODO(maruel): It should look at the free disk space, the current cache
823 # size and the size of the new item on every new item:
824 # - Trim the cache as more entries are listed when free disk space is low,
825 # otherwise if the amount of data downloaded during the run > free disk
826 # space, it'll crash.
827 # - Make sure there's enough free disk space to fit all dependencies of
828 # this run! If not, abort early.
829
830 # Start fetching.
831 self._pending.add(digest)
832 self.storage.async_fetch(
833 self._channel, priority, digest, size,
834 functools.partial(self.cache.write, digest))
835
836 def wait(self, digests):
837 """Starts a loop that waits for at least one of |digests| to be retrieved.
838
839 Returns the first digest retrieved.
840 """
841 # Flush any already fetched items.
842 for digest in digests:
843 if digest in self._fetched:
844 return digest
845
846 # Ensure all requested items are being fetched now.
847 assert all(digest in self._pending for digest in digests), (
848 digests, self._pending)
849
850 # Wait for some requested item to finish fetching.
851 while self._pending:
852 digest = self._channel.pull()
853 self._pending.remove(digest)
854 self._fetched.add(digest)
855 if digest in digests:
856 return digest
857
858 # Should never reach this point due to assert above.
859 raise RuntimeError('Impossible state')
860
861 def inject_local_file(self, path, algo):
862 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700863 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000864 data = f.read()
865 digest = algo(data).hexdigest()
866 self.cache.write(digest, [data])
867 self._fetched.add(digest)
868 return digest
869
870 @property
871 def pending_count(self):
872 """Returns number of items to be fetched."""
873 return len(self._pending)
874
875 def verify_all_cached(self):
876 """True if all accessed items are in cache."""
877 return self._accessed.issubset(self.cache.cached_set())
878
879
880class FetchStreamVerifier(object):
881 """Verifies that fetched file is valid before passing it to the LocalCache."""
882
883 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400884 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000885 self.stream = stream
886 self.expected_size = expected_size
887 self.current_size = 0
888
889 def run(self):
890 """Generator that yields same items as |stream|.
891
892 Verifies |stream| is complete before yielding a last chunk to consumer.
893
894 Also wraps IOError produced by consumer into MappingError exceptions since
895 otherwise Storage will retry fetch on unrelated local cache errors.
896 """
897 # Read one chunk ahead, keep it in |stored|.
898 # That way a complete stream can be verified before pushing last chunk
899 # to consumer.
900 stored = None
901 for chunk in self.stream:
902 assert chunk is not None
903 if stored is not None:
904 self._inspect_chunk(stored, is_last=False)
905 try:
906 yield stored
907 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400908 raise isolated_format.MappingError(
909 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000910 stored = chunk
911 if stored is not None:
912 self._inspect_chunk(stored, is_last=True)
913 try:
914 yield stored
915 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400916 raise isolated_format.MappingError(
917 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000918
919 def _inspect_chunk(self, chunk, is_last):
920 """Called for each fetched chunk before passing it to consumer."""
921 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400922 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700923 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000924 (self.expected_size != self.current_size)):
925 raise IOError('Incorrect file size: expected %d, got %d' % (
926 self.expected_size, self.current_size))
927
928
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000929class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800930 """Interface for classes that implement low-level storage operations.
931
932 StorageApi is oblivious of compression and hashing scheme used. This details
933 are handled in higher level Storage class.
934
935 Clients should generally not use StorageApi directly. Storage class is
936 preferred since it implements compression and upload optimizations.
937 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000938
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700939 @property
940 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500941 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700942 raise NotImplementedError()
943
944 @property
945 def namespace(self):
946 """Isolate namespace used by this storage.
947
948 Indirectly defines hashing scheme and compression method used.
949 """
950 raise NotImplementedError()
951
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800952 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000953 """Fetches an object and yields its content.
954
955 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000956 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800957 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000958
959 Yields:
960 Chunks of downloaded item (as str objects).
961 """
962 raise NotImplementedError()
963
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800964 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000965 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000966
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800967 |item| MUST go through 'contains' call to get |push_state| before it can
968 be pushed to the storage.
969
970 To be clear, here is one possible usage:
971 all_items = [... all items to push as Item subclasses ...]
972 for missing_item, push_state in storage_api.contains(all_items).items():
973 storage_api.push(missing_item, push_state)
974
975 When pushing to a namespace with compression, data that should be pushed
976 and data provided by the item is not the same. In that case |content| is
977 not None and it yields chunks of compressed data (using item.content() as
978 a source of original uncompressed data). This is implemented by Storage
979 class.
980
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000981 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000982 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800983 push_state: push state object as returned by 'contains' call.
984 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000985
986 Returns:
987 None.
988 """
989 raise NotImplementedError()
990
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000991 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800992 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000993
994 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800995 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000996
997 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800998 A dict missing Item -> opaque push state object to be passed to 'push'.
999 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001000 """
1001 raise NotImplementedError()
1002
1003
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001004class _IsolateServerPushState(object):
1005 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -05001006
1007 Note this needs to be a global class to support pickling.
1008 """
1009
Cory Massarocc19c8c2015-03-10 13:35:11 -07001010 def __init__(self, preupload_status, size):
1011 self.preupload_status = preupload_status
1012 gs_upload_url = preupload_status.get('gs_upload_url') or None
1013 if gs_upload_url:
1014 self.upload_url = gs_upload_url
maruel380e3262016-08-31 16:10:06 -07001015 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
Cory Massarocc19c8c2015-03-10 13:35:11 -07001016 else:
maruel380e3262016-08-31 16:10:06 -07001017 self.upload_url = 'api/isolateservice/v1/store_inline'
Cory Massarocc19c8c2015-03-10 13:35:11 -07001018 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -05001019 self.uploaded = False
1020 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001021 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -05001022
1023
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001024class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001025 """StorageApi implementation that downloads and uploads to Isolate Server.
1026
1027 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001028 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001029 """
1030
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001031 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001032 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001033 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001034 self._base_url = base_url.rstrip('/')
1035 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -07001036 self._namespace_dict = {
1037 'compression': 'flate' if namespace.endswith(
1038 ('-gzip', '-flate')) else '',
1039 'digest_hash': 'sha-1',
1040 'namespace': namespace,
1041 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001042 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001043 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001044 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001045
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001046 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001047 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001048 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001049
1050 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001051 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001052 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +00001053 # TODO(maruel): Make this request much earlier asynchronously while the
1054 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001055
1056 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
1057 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001058
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001059 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001060 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001061 self._server_caps = net.url_read_json(
maruel380e3262016-08-31 16:10:06 -07001062 url='%s/api/isolateservice/v1/server_details' % self._base_url,
Cory Massarocc19c8c2015-03-10 13:35:11 -07001063 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001064 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001065
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001066 @property
1067 def location(self):
1068 return self._base_url
1069
1070 @property
1071 def namespace(self):
1072 return self._namespace
1073
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001074 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -07001075 assert offset >= 0
maruel380e3262016-08-31 16:10:06 -07001076 source_url = '%s/api/isolateservice/v1/retrieve' % (
Cory Massarocc19c8c2015-03-10 13:35:11 -07001077 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001078 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001079 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001080
Cory Massarocc19c8c2015-03-10 13:35:11 -07001081 if not response:
maruele154f9c2015-09-14 11:03:15 -07001082 raise IOError(
1083 'Attempted to fetch from %s; no data exist: %s / %s.' % (
1084 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001085
Cory Massarocc19c8c2015-03-10 13:35:11 -07001086 # for DB uploads
1087 content = response.get('content')
1088 if content is not None:
maruel863ac262016-03-17 11:00:37 -07001089 yield base64.b64decode(content)
1090 return
Cory Massarocc19c8c2015-03-10 13:35:11 -07001091
1092 # for GS entities
1093 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -07001094 if not connection:
1095 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -07001096
1097 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001098 if offset:
1099 content_range = connection.get_header('Content-Range')
1100 if not content_range:
1101 raise IOError('Missing Content-Range header')
1102
1103 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1104 # According to a spec, <size> can be '*' meaning "Total size of the file
1105 # is not known in advance".
1106 try:
1107 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1108 if not match:
1109 raise ValueError()
1110 content_offset = int(match.group(1))
1111 last_byte_index = int(match.group(2))
1112 size = None if match.group(3) == '*' else int(match.group(3))
1113 except ValueError:
1114 raise IOError('Invalid Content-Range header: %s' % content_range)
1115
1116 # Ensure returned offset equals requested one.
1117 if offset != content_offset:
1118 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1119 offset, content_offset, content_range))
1120
1121 # Ensure entire tail of the file is returned.
1122 if size is not None and last_byte_index + 1 != size:
1123 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1124
maruel863ac262016-03-17 11:00:37 -07001125 for data in connection.iter_content(NET_IO_FILE_CHUNK):
1126 yield data
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001127
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001128 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001129 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001130 assert item.digest is not None
1131 assert item.size is not None
1132 assert isinstance(push_state, _IsolateServerPushState)
1133 assert not push_state.finalized
1134
1135 # Default to item.content().
1136 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001137 logging.info('Push state size: %d', push_state.size)
1138 if isinstance(content, (basestring, list)):
1139 # Memory is already used, too late.
1140 with self._lock:
1141 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001142 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001143 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1144 # If |content| is indeed a generator, it can not be re-winded back to the
1145 # beginning of the stream. A retry will find it exhausted. A possible
1146 # solution is to wrap |content| generator with some sort of caching
1147 # restartable generator. It should be done alongside streaming support
1148 # implementation.
1149 #
1150 # In theory, we should keep the generator, so that it is not serialized in
1151 # memory. Sadly net.HttpService.request() requires the body to be
1152 # serialized.
1153 assert isinstance(content, types.GeneratorType), repr(content)
1154 slept = False
1155 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001156 # One byte less than 512mb. This is to cope with incompressible content.
1157 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001158 while True:
1159 with self._lock:
1160 # This is due to 32 bits python when uploading very large files. The
1161 # problem is that it's comparing uncompressed sizes, while we care
1162 # about compressed sizes since it's what is serialized in memory.
1163 # The first check assumes large files are compressible and that by
1164 # throttling one upload at once, we can survive. Otherwise, kaboom.
1165 memory_use = self._memory_use
1166 if ((push_state.size >= max_size and not memory_use) or
1167 (memory_use + push_state.size <= max_size)):
1168 self._memory_use += push_state.size
1169 memory_use = self._memory_use
1170 break
1171 time.sleep(0.1)
1172 slept = True
1173 if slept:
1174 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001175
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001176 try:
1177 # This push operation may be a retry after failed finalization call below,
1178 # no need to reupload contents in that case.
1179 if not push_state.uploaded:
1180 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001181 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001182 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001183 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001184 item.digest, push_state.upload_url))
1185 push_state.uploaded = True
1186 else:
1187 logging.info(
1188 'A file %s already uploaded, retrying finalization only',
1189 item.digest)
1190
1191 # Optionally notify the server that it's done.
1192 if push_state.finalize_url:
1193 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1194 # send it to isolated server. That way isolate server can verify that
1195 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1196 # stored files).
1197 # TODO(maruel): Fix the server to accept properly data={} so
1198 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001199 response = net.url_read_json(
1200 url='%s/%s' % (self._base_url, push_state.finalize_url),
1201 data={
1202 'upload_ticket': push_state.preupload_status['upload_ticket'],
1203 })
1204 if not response or not response['ok']:
1205 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001206 push_state.finalized = True
1207 finally:
1208 with self._lock:
1209 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001210
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001211 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001212 # Ensure all items were initialized with 'prepare' call. Storage does that.
1213 assert all(i.digest is not None and i.size is not None for i in items)
1214
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001215 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001216 body = {
1217 'items': [
1218 {
1219 'digest': item.digest,
1220 'is_isolated': bool(item.high_priority),
1221 'size': item.size,
1222 } for item in items
1223 ],
1224 'namespace': self._namespace_dict,
1225 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001226
maruel380e3262016-08-31 16:10:06 -07001227 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001228
1229 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001230 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001231 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001232 response = net.url_read_json(url=query_url, data=body)
1233 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001234 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001235 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001236 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001237 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001238 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001239
1240 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001241 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001242 for preupload_status in response.get('items', []):
1243 assert 'upload_ticket' in preupload_status, (
1244 preupload_status, '/preupload did not generate an upload ticket')
1245 index = int(preupload_status['index'])
1246 missing_items[items[index]] = _IsolateServerPushState(
1247 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001248 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001249 len(items), len(items) - len(missing_items))
1250 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001251
Cory Massarocc19c8c2015-03-10 13:35:11 -07001252 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001253 """Fetches isolated data from the URL.
1254
1255 Used only for fetching files, not for API calls. Can be overridden in
1256 subclasses.
1257
1258 Args:
1259 url: URL to fetch the data from, can possibly return http redirect.
1260 offset: byte offset inside the file to start fetching from.
1261
1262 Returns:
1263 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1264 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001265 assert isinstance(offset, int)
1266 data = {
1267 'digest': digest.encode('utf-8'),
1268 'namespace': self._namespace_dict,
1269 'offset': offset,
1270 }
maruel0c25f4f2015-12-15 05:41:17 -08001271 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1272 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001273 return net.url_read_json(
1274 url=url,
1275 data=data,
1276 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001277
Cory Massarocc19c8c2015-03-10 13:35:11 -07001278 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001279 """Uploads isolated file to the URL.
1280
1281 Used only for storing files, not for API calls. Can be overridden in
1282 subclasses.
1283
1284 Args:
1285 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001286 push_state: an _IsolateServicePushState instance
1287 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001288 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001289 """
1290 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1291 # upload support is implemented.
1292 if isinstance(content, list) and len(content) == 1:
1293 content = content[0]
1294 else:
1295 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001296
1297 # DB upload
1298 if not push_state.finalize_url:
1299 url = '%s/%s' % (self._base_url, push_state.upload_url)
1300 content = base64.b64encode(content)
1301 data = {
1302 'upload_ticket': push_state.preupload_status['upload_ticket'],
1303 'content': content,
1304 }
1305 response = net.url_read_json(url=url, data=data)
1306 return response is not None and response['ok']
1307
1308 # upload to GS
1309 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001310 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001311 content_type='application/octet-stream',
1312 data=content,
1313 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001314 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001315 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001316 return response is not None
1317
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001318
nodir445097b2016-06-03 22:50:26 -07001319class CacheMiss(Exception):
1320 """Raised when an item is not in cache."""
1321
1322 def __init__(self, digest):
1323 self.digest = digest
1324 super(CacheMiss, self).__init__(
1325 'Item with digest %r is not found in cache' % digest)
1326
1327
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001328class LocalCache(object):
1329 """Local cache that stores objects fetched via Storage.
1330
1331 It can be accessed concurrently from multiple threads, so it should protect
1332 its internal state with some lock.
1333 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001334 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001335
maruel064c0a32016-04-05 11:47:15 -07001336 def __init__(self):
1337 self._lock = threading_utils.LockWithAssert()
1338 # Profiling values.
1339 self._added = []
1340 self._initial_number_items = 0
1341 self._initial_size = 0
1342 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -07001343 self._used = []
maruel064c0a32016-04-05 11:47:15 -07001344
nodirbe642ff2016-06-09 15:51:51 -07001345 def __contains__(self, digest):
1346 raise NotImplementedError()
1347
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001348 def __enter__(self):
1349 """Context manager interface."""
1350 return self
1351
1352 def __exit__(self, _exc_type, _exec_value, _traceback):
1353 """Context manager interface."""
1354 return False
1355
maruel064c0a32016-04-05 11:47:15 -07001356 @property
1357 def added(self):
1358 return self._added[:]
1359
1360 @property
1361 def evicted(self):
1362 return self._evicted[:]
1363
1364 @property
tansell9e04a8d2016-07-28 09:31:59 -07001365 def used(self):
1366 return self._used[:]
1367
1368 @property
maruel064c0a32016-04-05 11:47:15 -07001369 def initial_number_items(self):
1370 return self._initial_number_items
1371
1372 @property
1373 def initial_size(self):
1374 return self._initial_size
1375
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001376 def cached_set(self):
1377 """Returns a set of all cached digests (always a new object)."""
1378 raise NotImplementedError()
1379
maruel36a963d2016-04-08 17:15:49 -07001380 def cleanup(self):
1381 """Deletes any corrupted item from the cache and trims it if necessary."""
1382 raise NotImplementedError()
1383
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001384 def touch(self, digest, size):
1385 """Ensures item is not corrupted and updates its LRU position.
1386
1387 Arguments:
1388 digest: hash digest of item to check.
1389 size: expected size of this item.
1390
1391 Returns:
1392 True if item is in cache and not corrupted.
1393 """
1394 raise NotImplementedError()
1395
1396 def evict(self, digest):
1397 """Removes item from cache if it's there."""
1398 raise NotImplementedError()
1399
tansell9e04a8d2016-07-28 09:31:59 -07001400 def getfileobj(self, digest):
1401 """Returns a readable file like object.
1402
1403 If file exists on the file system it will have a .name attribute with an
1404 absolute path to the file.
1405 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001406 raise NotImplementedError()
1407
1408 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001409 """Reads data from |content| generator and stores it in cache.
1410
1411 Returns digest to simplify chaining.
1412 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001413 raise NotImplementedError()
1414
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001415
1416class MemoryCache(LocalCache):
1417 """LocalCache implementation that stores everything in memory."""
1418
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001419 def __init__(self, file_mode_mask=0500):
1420 """Args:
1421 file_mode_mask: bit mask to AND file mode with. Default value will make
1422 all mapped files to be read only.
1423 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001424 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001425 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001426 self._contents = {}
1427
nodirbe642ff2016-06-09 15:51:51 -07001428 def __contains__(self, digest):
1429 with self._lock:
1430 return digest in self._contents
1431
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001432 def cached_set(self):
1433 with self._lock:
1434 return set(self._contents)
1435
maruel36a963d2016-04-08 17:15:49 -07001436 def cleanup(self):
1437 pass
1438
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001439 def touch(self, digest, size):
1440 with self._lock:
1441 return digest in self._contents
1442
1443 def evict(self, digest):
1444 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001445 v = self._contents.pop(digest, None)
1446 if v is not None:
1447 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001448
tansell9e04a8d2016-07-28 09:31:59 -07001449 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001450 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001451 try:
tansell9e04a8d2016-07-28 09:31:59 -07001452 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001453 except KeyError:
1454 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001455 self._used.append(len(d))
1456 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001457
1458 def write(self, digest, content):
1459 # Assemble whole stream before taking the lock.
1460 data = ''.join(content)
1461 with self._lock:
1462 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001463 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001464 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001465
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001466
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001467class CachePolicies(object):
1468 def __init__(self, max_cache_size, min_free_space, max_items):
1469 """
1470 Arguments:
1471 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1472 cache is effectively a leak.
1473 - min_free_space: Trim if disk free space becomes lower than this value. If
1474 0, it unconditionally fill the disk.
1475 - max_items: Maximum number of items to keep in the cache. If 0, do not
1476 enforce a limit.
1477 """
1478 self.max_cache_size = max_cache_size
1479 self.min_free_space = min_free_space
1480 self.max_items = max_items
1481
1482
1483class DiskCache(LocalCache):
1484 """Stateful LRU cache in a flat hash table in a directory.
1485
1486 Saves its state as json file.
1487 """
maruel12e30012015-10-09 11:55:35 -07001488 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001489
nodirf33b8d62016-10-26 22:34:58 -07001490 def __init__(self, cache_dir, policies, hash_algo, trim=True):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001491 """
1492 Arguments:
1493 cache_dir: directory where to place the cache.
1494 policies: cache retention policies.
1495 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001496 trim: if True to enforce |policies| right away.
1497 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001498 """
maruel064c0a32016-04-05 11:47:15 -07001499 # All protected methods (starting with '_') except _path should be called
1500 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001501 super(DiskCache, self).__init__()
1502 self.cache_dir = cache_dir
1503 self.policies = policies
1504 self.hash_algo = hash_algo
1505 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001506 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001507 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001508 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001509 file_path.ensure_tree(self.cache_dir)
1510 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001511 # The first item in the LRU cache that must not be evicted during this run
1512 # since it was referenced. All items more recent that _protected in the LRU
1513 # cache are also inherently protected. It could be a set() of all items
1514 # referenced but this increases memory usage without a use case.
1515 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001516 # Cleanup operations done by self._load(), if any.
1517 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001518 with tools.Profiler('Setup'):
1519 with self._lock:
nodirf33b8d62016-10-26 22:34:58 -07001520 self._load(trim=trim)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001521
nodirbe642ff2016-06-09 15:51:51 -07001522 def __contains__(self, digest):
1523 with self._lock:
1524 return digest in self._lru
1525
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001526 def __enter__(self):
1527 return self
1528
1529 def __exit__(self, _exc_type, _exec_value, _traceback):
1530 with tools.Profiler('CleanupTrimming'):
1531 with self._lock:
1532 self._trim()
1533
1534 logging.info(
1535 '%5d (%8dkb) added',
1536 len(self._added), sum(self._added) / 1024)
1537 logging.info(
1538 '%5d (%8dkb) current',
1539 len(self._lru),
1540 sum(self._lru.itervalues()) / 1024)
1541 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001542 '%5d (%8dkb) evicted',
1543 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001544 logging.info(
1545 ' %8dkb free',
1546 self._free_disk / 1024)
1547 return False
1548
1549 def cached_set(self):
1550 with self._lock:
1551 return self._lru.keys_set()
1552
maruel36a963d2016-04-08 17:15:49 -07001553 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001554 """Cleans up the cache directory.
1555
1556 Ensures there is no unknown files in cache_dir.
1557 Ensures the read-only bits are set correctly.
1558
1559 At that point, the cache was already loaded, trimmed to respect cache
1560 policies.
1561 """
1562 fs.chmod(self.cache_dir, 0700)
1563 # Ensure that all files listed in the state still exist and add new ones.
1564 previous = self._lru.keys_set()
1565 # It'd be faster if there were a readdir() function.
1566 for filename in fs.listdir(self.cache_dir):
1567 if filename == self.STATE_FILE:
1568 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1569 continue
1570 if filename in previous:
1571 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1572 previous.remove(filename)
1573 continue
1574
1575 # An untracked file. Delete it.
1576 logging.warning('Removing unknown file %s from cache', filename)
1577 p = self._path(filename)
1578 if fs.isdir(p):
1579 try:
1580 file_path.rmtree(p)
1581 except OSError:
1582 pass
1583 else:
1584 file_path.try_remove(p)
1585 continue
1586
1587 if previous:
1588 # Filter out entries that were not found.
1589 logging.warning('Removed %d lost files', len(previous))
1590 for filename in previous:
1591 self._lru.pop(filename)
maruel36a963d2016-04-08 17:15:49 -07001592
1593 # What remains to be done is to hash every single item to
1594 # detect corruption, then save to ensure state.json is up to date.
1595 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1596 # TODO(maruel): Let's revisit once directory metadata is stored in
1597 # state.json so only the files that had been mapped since the last cleanup()
1598 # call are manually verified.
1599 #
1600 #with self._lock:
1601 # for digest in self._lru:
1602 # if not isolated_format.is_valid_hash(
1603 # self._path(digest), self.hash_algo):
1604 # self.evict(digest)
1605 # logging.info('Deleted corrupted item: %s', digest)
1606
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001607 def touch(self, digest, size):
1608 """Verifies an actual file is valid.
1609
1610 Note that is doesn't compute the hash so it could still be corrupted if the
1611 file size didn't change.
1612
1613 TODO(maruel): More stringent verification while keeping the check fast.
1614 """
1615 # Do the check outside the lock.
1616 if not is_valid_file(self._path(digest), size):
1617 return False
1618
1619 # Update it's LRU position.
1620 with self._lock:
1621 if digest not in self._lru:
1622 return False
1623 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001624 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001625 return True
1626
1627 def evict(self, digest):
1628 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001629 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001630 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001631 self._lru.pop(digest)
1632 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1633
tansell9e04a8d2016-07-28 09:31:59 -07001634 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001635 try:
tansell9e04a8d2016-07-28 09:31:59 -07001636 f = fs.open(self._path(digest), 'rb')
1637 with self._lock:
1638 self._used.append(self._lru[digest])
1639 return f
nodir445097b2016-06-03 22:50:26 -07001640 except IOError:
1641 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001642
1643 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001644 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001645 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001646 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001647 path = self._path(digest)
1648 # A stale broken file may remain. It is possible for the file to have write
1649 # access bit removed which would cause the file_write() call to fail to open
1650 # in write mode. Take no chance here.
1651 file_path.try_remove(path)
1652 try:
1653 size = file_write(path, content)
1654 except:
1655 # There are two possible places were an exception can occur:
1656 # 1) Inside |content| generator in case of network or unzipping errors.
1657 # 2) Inside file_write itself in case of disk IO errors.
1658 # In any case delete an incomplete file and propagate the exception to
1659 # caller, it will be logged there.
1660 file_path.try_remove(path)
1661 raise
1662 # Make the file read-only in the cache. This has a few side-effects since
1663 # the file node is modified, so every directory entries to this file becomes
1664 # read-only. It's fine here because it is a new file.
1665 file_path.set_read_only(path, True)
1666 with self._lock:
1667 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001668 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001669
nodirf33b8d62016-10-26 22:34:58 -07001670 def get_oldest(self):
1671 """Returns digest of the LRU item or None."""
1672 try:
1673 return self._lru.get_oldest()[0]
1674 except KeyError:
1675 return None
1676
1677 def get_timestamp(self, digest):
1678 """Returns timestamp of last use of an item.
1679
1680 Raises KeyError if item is not found.
1681 """
1682 return self._lru.get_timestamp(digest)
1683
1684 def trim(self):
1685 """Forces retention policies."""
1686 with self._lock:
1687 self._trim()
1688
1689 def _load(self, trim):
maruel2e8d0f52016-07-16 07:51:29 -07001690 """Loads state of the cache from json file.
1691
1692 If cache_dir does not exist on disk, it is created.
1693 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001694 self._lock.assert_locked()
1695
maruel2e8d0f52016-07-16 07:51:29 -07001696 if not fs.isfile(self.state_file):
1697 if not os.path.isdir(self.cache_dir):
1698 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001699 else:
maruel2e8d0f52016-07-16 07:51:29 -07001700 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001701 try:
1702 self._lru = lru.LRUDict.load(self.state_file)
1703 except ValueError as err:
1704 logging.error('Failed to load cache state: %s' % (err,))
1705 # Don't want to keep broken state file.
1706 file_path.try_remove(self.state_file)
nodirf33b8d62016-10-26 22:34:58 -07001707 if trim:
1708 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001709 # We want the initial cache size after trimming, i.e. what is readily
1710 # avaiable.
1711 self._initial_number_items = len(self._lru)
1712 self._initial_size = sum(self._lru.itervalues())
1713 if self._evicted:
1714 logging.info(
1715 'Trimming evicted items with the following sizes: %s',
1716 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001717
1718 def _save(self):
1719 """Saves the LRU ordering."""
1720 self._lock.assert_locked()
1721 if sys.platform != 'win32':
1722 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001723 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001724 # Necessary otherwise the file can't be created.
1725 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001726 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001727 file_path.set_read_only(self.state_file, False)
1728 self._lru.save(self.state_file)
1729
1730 def _trim(self):
1731 """Trims anything we don't know, make sure enough free space exists."""
1732 self._lock.assert_locked()
1733
1734 # Ensure maximum cache size.
1735 if self.policies.max_cache_size:
1736 total_size = sum(self._lru.itervalues())
1737 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001738 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001739
1740 # Ensure maximum number of items in the cache.
1741 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1742 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001743 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001744
1745 # Ensure enough free space.
1746 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001747 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001748 while (
1749 self.policies.min_free_space and
1750 self._lru and
1751 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001752 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001753 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001754
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001755 if trimmed_due_to_space:
1756 total_usage = sum(self._lru.itervalues())
1757 usage_percent = 0.
1758 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001759 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1760
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001761 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001762 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1763 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1764 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001765 self._free_disk / 1024.,
1766 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001767 usage_percent,
1768 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001769 self._save()
1770
1771 def _path(self, digest):
1772 """Returns the path to one item."""
1773 return os.path.join(self.cache_dir, digest)
1774
maruel2e8d0f52016-07-16 07:51:29 -07001775 def _remove_lru_file(self, allow_protected):
1776 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001777 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001778 try:
nodireabc11c2016-10-18 16:37:28 -07001779 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001780 if not allow_protected and digest == self._protected:
1781 raise Error('Not enough space to map the whole isolated tree')
maruel083fa552016-04-08 14:38:01 -07001782 except KeyError:
1783 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001784 digest, (size, _) = self._lru.pop_oldest()
kjlubickea9abf02016-06-01 09:34:33 -07001785 logging.debug("Removing LRU file %s", digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001786 self._delete_file(digest, size)
1787 return size
1788
1789 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1790 """Adds an item into LRU cache marking it as a newest one."""
1791 self._lock.assert_locked()
1792 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001793 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001794 self._added.append(size)
1795 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001796 self._free_disk -= size
1797 # Do a quicker version of self._trim(). It only enforces free disk space,
1798 # not cache size limits. It doesn't actually look at real free disk space,
1799 # only uses its cache values. self._trim() will be called later to enforce
1800 # real trimming but doing this quick version here makes it possible to map
1801 # an isolated that is larger than the current amount of free disk space when
1802 # the cache size is already large.
1803 while (
1804 self.policies.min_free_space and
1805 self._lru and
1806 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001807 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001808
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001809 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1810 """Deletes cache file from the file system."""
1811 self._lock.assert_locked()
1812 try:
1813 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001814 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001815 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001816 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001817 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001818 except OSError as e:
1819 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1820
1821
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001822class IsolatedBundle(object):
1823 """Fetched and parsed .isolated file with all dependencies."""
1824
Vadim Shtayura3148e072014-09-02 18:51:52 -07001825 def __init__(self):
1826 self.command = []
1827 self.files = {}
1828 self.read_only = None
1829 self.relative_cwd = None
1830 # The main .isolated file, a IsolatedFile instance.
1831 self.root = None
1832
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001833 def fetch(self, fetch_queue, root_isolated_hash, algo):
1834 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001835
1836 It enables support for "included" .isolated files. They are processed in
1837 strict order but fetched asynchronously from the cache. This is important so
1838 that a file in an included .isolated file that is overridden by an embedding
1839 .isolated file is not fetched needlessly. The includes are fetched in one
1840 pass and the files are fetched as soon as all the ones on the left-side
1841 of the tree were fetched.
1842
1843 The prioritization is very important here for nested .isolated files.
1844 'includes' have the highest priority and the algorithm is optimized for both
1845 deep and wide trees. A deep one is a long link of .isolated files referenced
1846 one at a time by one item in 'includes'. A wide one has a large number of
1847 'includes' in a single .isolated file. 'left' is defined as an included
1848 .isolated file earlier in the 'includes' list. So the order of the elements
1849 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001850
1851 As a side effect this method starts asynchronous fetch of all data files
1852 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1853 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001854 """
1855 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1856
1857 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1858 pending = {}
1859 # Set of hashes of already retrieved items to refuse recursive includes.
1860 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001861 # Set of IsolatedFile's whose data files have already being fetched.
1862 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001863
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001864 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001865 h = isolated_file.obj_hash
1866 if h in seen:
1867 raise isolated_format.IsolatedError(
1868 'IsolatedFile %s is retrieved recursively' % h)
1869 assert h not in pending
1870 seen.add(h)
1871 pending[h] = isolated_file
1872 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1873
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001874 # Start fetching root *.isolated file (single file, not the whole bundle).
1875 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001876
1877 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001878 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001879 item_hash = fetch_queue.wait(pending)
1880 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001881 with fetch_queue.cache.getfileobj(item_hash) as f:
1882 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001883
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001884 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001885 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001886 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001887
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001888 # Always fetch *.isolated files in traversal order, waiting if necessary
1889 # until next to-be-processed node loads. "Waiting" is done by yielding
1890 # back to the outer loop, that waits until some *.isolated is loaded.
1891 for node in isolated_format.walk_includes(self.root):
1892 if node not in processed:
1893 # Not visited, and not yet loaded -> wait for it to load.
1894 if not node.is_loaded:
1895 break
1896 # Not visited and loaded -> process it and continue the traversal.
1897 self._start_fetching_files(node, fetch_queue)
1898 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001899
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001900 # All *.isolated files should be processed by now and only them.
1901 all_isolateds = set(isolated_format.walk_includes(self.root))
1902 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001903
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001904 # Extract 'command' and other bundle properties.
1905 for node in isolated_format.walk_includes(self.root):
1906 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001907 self.relative_cwd = self.relative_cwd or ''
1908
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001909 def _start_fetching_files(self, isolated, fetch_queue):
1910 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001911
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001912 Modifies self.files.
1913 """
1914 logging.debug('fetch_files(%s)', isolated.obj_hash)
1915 for filepath, properties in isolated.data.get('files', {}).iteritems():
1916 # Root isolated has priority on the files being mapped. In particular,
1917 # overridden files must not be fetched.
1918 if filepath not in self.files:
1919 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001920
1921 # Make sure if the isolated is read only, the mode doesn't have write
1922 # bits.
1923 if 'm' in properties and self.read_only:
1924 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1925
1926 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001927 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001928 logging.debug('fetching %s', filepath)
1929 fetch_queue.add(
1930 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1931
1932 def _update_self(self, node):
1933 """Extracts bundle global parameters from loaded *.isolated file.
1934
1935 Will be called with each loaded *.isolated file in order of traversal of
1936 isolated include graph (see isolated_format.walk_includes).
1937 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001938 # Grabs properties.
1939 if not self.command and node.data.get('command'):
1940 # Ensure paths are correctly separated on windows.
1941 self.command = node.data['command']
1942 if self.command:
1943 self.command[0] = self.command[0].replace('/', os.path.sep)
1944 self.command = tools.fix_python_path(self.command)
1945 if self.read_only is None and node.data.get('read_only') is not None:
1946 self.read_only = node.data['read_only']
1947 if (self.relative_cwd is None and
1948 node.data.get('relative_cwd') is not None):
1949 self.relative_cwd = node.data['relative_cwd']
1950
1951
Vadim Shtayura8623c272014-12-01 11:45:27 -08001952def set_storage_api_class(cls):
1953 """Replaces StorageApi implementation used by default."""
1954 global _storage_api_cls
1955 assert _storage_api_cls is None
1956 assert issubclass(cls, StorageApi)
1957 _storage_api_cls = cls
1958
1959
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001960def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001961 """Returns an object that implements low-level StorageApi interface.
1962
1963 It is used by Storage to work with single isolate |namespace|. It should
1964 rarely be used directly by clients, see 'get_storage' for
1965 a better alternative.
1966
1967 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001968 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001969 namespace: isolate namespace to operate in, also defines hashing and
1970 compression scheme used, i.e. namespace names that end with '-gzip'
1971 store compressed data.
1972
1973 Returns:
1974 Instance of StorageApi subclass.
1975 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001976 cls = _storage_api_cls or IsolateServer
1977 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001978
1979
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001980def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001981 """Returns Storage class that can upload and download from |namespace|.
1982
1983 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001984 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001985 namespace: isolate namespace to operate in, also defines hashing and
1986 compression scheme used, i.e. namespace names that end with '-gzip'
1987 store compressed data.
1988
1989 Returns:
1990 Instance of Storage.
1991 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001992 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001993
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001994
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001995def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001996 """Uploads the given tree to the given url.
1997
1998 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001999 base_url: The url of the isolate server to upload to.
2000 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00002001 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002002 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002003 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002004 # Filter out symlinks, since they are not represented by items on isolate
2005 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002006 items = []
2007 seen = set()
2008 skipped = 0
2009 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07002010 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002011 if 'l' not in metadata and filepath not in seen:
2012 seen.add(filepath)
2013 item = FileItem(
2014 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002015 digest=metadata['h'],
2016 size=metadata['s'],
2017 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002018 items.append(item)
2019 else:
2020 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002021
Vadim Shtayuraea38c572014-10-06 16:57:16 -07002022 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002023 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002024 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07002025
2026
maruel4409e302016-07-19 14:25:51 -07002027def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002028 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002029
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002030 Arguments:
2031 isolated_hash: hash of the root *.isolated file.
2032 storage: Storage class that communicates with isolate storage.
2033 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002034 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07002035 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002036
2037 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002038 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002039 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002040 logging.debug(
maruel4409e302016-07-19 14:25:51 -07002041 'fetch_isolated(%s, %s, %s, %s, %s)',
2042 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002043 # Hash algorithm to use, defined by namespace |storage| is using.
2044 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002045 with cache:
2046 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002047 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002048
2049 with tools.Profiler('GetIsolateds'):
2050 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002051 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002052 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07002053 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002054 try:
maruel1ceb3872015-10-14 06:10:44 -07002055 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002056 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002057 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04002058 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
2059 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002060
2061 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002062 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002063
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002064 with tools.Profiler('GetRest'):
2065 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07002066 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002067 create_directories(outdir, bundle.files)
2068 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002069
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002070 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002071 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07002072 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002073
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002074 # Multimap: digest -> list of pairs (path, props).
2075 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002076 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002077 if 'h' in props:
2078 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002079
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002080 # Now block on the remaining files to be downloaded and mapped.
2081 logging.info('Retrieving remaining files (%d of them)...',
2082 fetch_queue.pending_count)
2083 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07002084 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002085 while remaining:
2086 detector.ping()
2087
2088 # Wait for any item to finish fetching to cache.
2089 digest = fetch_queue.wait(remaining)
2090
tansell9e04a8d2016-07-28 09:31:59 -07002091 # Create the files in the destination using item in cache as the
2092 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002093 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07002094 fullpath = os.path.join(outdir, filepath)
2095
2096 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07002097 filetype = props.get('t', 'basic')
2098
2099 if filetype == 'basic':
2100 file_mode = props.get('m')
2101 if file_mode:
2102 # Ignore all bits apart from the user
2103 file_mode &= 0700
2104 putfile(
2105 srcfileobj, fullpath, file_mode,
2106 use_symlink=use_symlinks)
2107
2108 elif filetype == 'ar':
2109 basedir = os.path.dirname(fullpath)
2110 extractor = arfile.ArFileReader(srcfileobj, fullparse=False)
2111 for ai, ifd in extractor:
2112 fp = os.path.normpath(os.path.join(basedir, ai.name))
2113 file_path.ensure_tree(os.path.dirname(fp))
2114 putfile(ifd, fp, 0700, ai.size)
2115
2116 else:
2117 raise isolated_format.IsolatedError(
2118 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002119
2120 # Report progress.
2121 duration = time.time() - last_update
2122 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
2123 msg = '%d files remaining...' % len(remaining)
2124 print msg
2125 logging.info(msg)
2126 last_update = time.time()
2127
2128 # Cache could evict some items we just tried to fetch, it's a fatal error.
2129 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002130 raise isolated_format.MappingError(
2131 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002132 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002133
2134
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002135def directory_to_metadata(root, algo, blacklist):
2136 """Returns the FileItem list and .isolated metadata for a directory."""
2137 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002138 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07002139 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002140 metadata = {
2141 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05002142 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002143 for relpath in paths
2144 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002145 for v in metadata.itervalues():
2146 v.pop('t')
2147 items = [
2148 FileItem(
2149 path=os.path.join(root, relpath),
2150 digest=meta['h'],
2151 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002152 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002153 for relpath, meta in metadata.iteritems() if 'h' in meta
2154 ]
2155 return items, metadata
2156
2157
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002158def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002159 """Stores every entries and returns the relevant data.
2160
2161 Arguments:
2162 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002163 files: list of file paths to upload. If a directory is specified, a
2164 .isolated file is created and its hash is returned.
2165 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07002166
2167 Returns:
2168 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
2169 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002170 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002171 assert all(isinstance(i, unicode) for i in files), files
2172 if len(files) != len(set(map(os.path.abspath, files))):
2173 raise Error('Duplicate entries found.')
2174
maruel064c0a32016-04-05 11:47:15 -07002175 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002176 results = []
2177 # The temporary directory is only created as needed.
2178 tempdir = None
2179 try:
2180 # TODO(maruel): Yield the files to a worker thread.
2181 items_to_upload = []
2182 for f in files:
2183 try:
2184 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07002185 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002186 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002187 items, metadata = directory_to_metadata(
2188 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002189
2190 # Create the .isolated file.
2191 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002192 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
2193 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002194 os.close(handle)
2195 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002196 'algo':
2197 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002198 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002199 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002200 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002201 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002202 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002203 items_to_upload.extend(items)
2204 items_to_upload.append(
2205 FileItem(
2206 path=isolated,
2207 digest=h,
maruel12e30012015-10-09 11:55:35 -07002208 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002209 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002210 results.append((h, f))
2211
maruel12e30012015-10-09 11:55:35 -07002212 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002213 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002214 items_to_upload.append(
2215 FileItem(
2216 path=filepath,
2217 digest=h,
maruel12e30012015-10-09 11:55:35 -07002218 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002219 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002220 results.append((h, f))
2221 else:
2222 raise Error('%s is neither a file or directory.' % f)
2223 except OSError:
2224 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07002225 uploaded = storage.upload_items(items_to_upload)
2226 cold = [i for i in items_to_upload if i in uploaded]
2227 hot = [i for i in items_to_upload if i not in uploaded]
2228 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002229 finally:
maruel12e30012015-10-09 11:55:35 -07002230 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002231 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002232
2233
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002234def archive(out, namespace, files, blacklist):
2235 if files == ['-']:
2236 files = sys.stdin.readlines()
2237
2238 if not files:
2239 raise Error('Nothing to upload')
2240
2241 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002242 blacklist = tools.gen_blacklist(blacklist)
2243 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002244 # Ignore stats.
2245 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002246 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2247
2248
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002249@subcommand.usage('<file1..fileN> or - to read from stdin')
2250def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002251 """Archives data to the server.
2252
2253 If a directory is specified, a .isolated file is created the whole directory
2254 is uploaded. Then this .isolated file can be included in another one to run
2255 commands.
2256
2257 The commands output each file that was processed with its content hash. For
2258 directories, the .isolated generated for the directory is listed as the
2259 directory entry itself.
2260 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002261 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002262 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002263 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07002264 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002265 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002266 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002267 except Error as e:
2268 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002269 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002270
2271
2272def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002273 """Download data from the server.
2274
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002275 It can either download individual files or a complete tree from a .isolated
2276 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002277 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002278 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002279 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002280 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002281 help='hash of an isolated file, .isolated file content is discarded, use '
2282 '--file if you need it')
2283 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002284 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2285 help='hash and destination of a file, can be used multiple times')
2286 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002287 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002288 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07002289 parser.add_option(
2290 '--use-symlinks', action='store_true',
2291 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002292 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002293 options, args = parser.parse_args(args)
2294 if args:
2295 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002296
nodir55be77b2016-05-03 09:39:57 -07002297 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002298 if bool(options.isolated) == bool(options.file):
2299 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07002300 if not options.cache and options.use_symlinks:
2301 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002302
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002303 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07002304 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07002305 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002306 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002307 if (fs.isfile(options.target) or
2308 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002309 parser.error(
2310 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002311 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002312 # Fetching individual files.
2313 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002314 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002315 channel = threading_utils.TaskChannel()
2316 pending = {}
2317 for digest, dest in options.file:
2318 pending[digest] = dest
2319 storage.async_fetch(
2320 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002321 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002322 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002323 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002324 functools.partial(file_write, os.path.join(options.target, dest)))
2325 while pending:
2326 fetched = channel.pull()
2327 dest = pending.pop(fetched)
2328 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002329
Vadim Shtayura3172be52013-12-03 12:49:05 -08002330 # Fetching whole isolated tree.
2331 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002332 with cache:
2333 bundle = fetch_isolated(
2334 isolated_hash=options.isolated,
2335 storage=storage,
2336 cache=cache,
maruel4409e302016-07-19 14:25:51 -07002337 outdir=options.target,
2338 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002339 if bundle.command:
2340 rel = os.path.join(options.target, bundle.relative_cwd)
2341 print('To run this test please run from the directory %s:' %
2342 os.path.join(options.target, rel))
2343 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002344
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002345 return 0
2346
2347
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002348def add_archive_options(parser):
2349 parser.add_option(
2350 '--blacklist',
2351 action='append', default=list(DEFAULT_BLACKLIST),
2352 help='List of regexp to use as blacklist filter when uploading '
2353 'directories')
2354
2355
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002356def add_isolate_server_options(parser):
2357 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002358 parser.add_option(
2359 '-I', '--isolate-server',
2360 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002361 help='URL of the Isolate Server to use. Defaults to the environment '
2362 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2363 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002364 parser.add_option(
2365 '--namespace', default='default-gzip',
2366 help='The namespace to use on the Isolate Server, default: %default')
2367
2368
nodir55be77b2016-05-03 09:39:57 -07002369def process_isolate_server_options(
2370 parser, options, set_exception_handler, required):
2371 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002372
2373 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002374 """
2375 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002376 if required:
2377 parser.error('--isolate-server is required.')
2378 return
2379
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002380 try:
2381 options.isolate_server = net.fix_url(options.isolate_server)
2382 except ValueError as e:
2383 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002384 if set_exception_handler:
2385 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002386 try:
2387 return auth.ensure_logged_in(options.isolate_server)
2388 except ValueError as e:
2389 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002390
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002391
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002392def add_cache_options(parser):
2393 cache_group = optparse.OptionGroup(parser, 'Cache management')
2394 cache_group.add_option(
2395 '--cache', metavar='DIR',
2396 help='Directory to keep a local cache of the files. Accelerates download '
2397 'by reusing already downloaded files. Default=%default')
2398 cache_group.add_option(
2399 '--max-cache-size',
2400 type='int',
2401 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002402 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002403 help='Trim if the cache gets larger than this value, default=%default')
2404 cache_group.add_option(
2405 '--min-free-space',
2406 type='int',
2407 metavar='NNN',
2408 default=2*1024*1024*1024,
2409 help='Trim if disk free space becomes lower than this value, '
2410 'default=%default')
2411 cache_group.add_option(
2412 '--max-items',
2413 type='int',
2414 metavar='NNN',
2415 default=100000,
2416 help='Trim if more than this number of items are in the cache '
2417 'default=%default')
2418 parser.add_option_group(cache_group)
2419
2420
nodirf33b8d62016-10-26 22:34:58 -07002421def process_cache_options(options, trim=True):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002422 if options.cache:
2423 policies = CachePolicies(
2424 options.max_cache_size, options.min_free_space, options.max_items)
2425
2426 # |options.cache| path may not exist until DiskCache() instance is created.
2427 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002428 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002429 policies,
nodirf33b8d62016-10-26 22:34:58 -07002430 isolated_format.get_hash_algo(options.namespace),
2431 trim=trim)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002432 else:
2433 return MemoryCache()
2434
2435
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002436class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002437 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002438 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002439 self,
2440 version=__version__,
2441 prog=os.path.basename(sys.modules[__name__].__file__),
2442 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002443 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002444
2445 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002446 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002447 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002448 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002449 return options, args
2450
2451
2452def main(args):
2453 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002454 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002455
2456
2457if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002458 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002459 fix_encoding.fix_encoding()
2460 tools.disable_buffering()
2461 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002462 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002463 sys.exit(main(sys.argv[1:]))