blob: 1215be3b191cf847ac340b793d122be87d38c7ce [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
maruel12e30012015-10-09 11:55:35 -07008__version__ = '0.4.5'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050018import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050021import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080041
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043# Version of isolate protocol passed to the server in /handshake request.
44ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046
Vadim Shtayura3148e072014-09-02 18:51:52 -070047# The file size to be used when we don't know the correct file size,
48# generally used for .isolated files.
49UNKNOWN_FILE_SIZE = None
50
51
52# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000080# Chunk size to use when reading from network stream.
81NET_IO_FILE_CHUNK = 16 * 1024
82
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000083
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000084# Read timeout in seconds for downloads from isolate storage. If there's no
85# response from the server within this timeout whole download will be aborted.
86DOWNLOAD_READ_TIMEOUT = 60
87
88
maruel@chromium.org41601642013-09-18 19:40:46 +000089# The delay (in seconds) to wait between logging statements when retrieving
90# the required files. This is intended to let the user (or buildbot) know that
91# the program is still running.
92DELAY_BETWEEN_UPDATES_IN_SECS = 30
93
94
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050095DEFAULT_BLACKLIST = (
96 # Temporary vim or python files.
97 r'^.+\.(?:pyc|swp)$',
98 # .git or .svn directory.
99 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
100)
101
102
Vadim Shtayura8623c272014-12-01 11:45:27 -0800103# A class to use to communicate with the server by default. Can be changed by
104# 'set_storage_api_class'. Default is IsolateServer.
105_storage_api_cls = None
106
107
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500108class Error(Exception):
109 """Generic runtime error."""
110 pass
111
112
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400113class Aborted(Error):
114 """Operation aborted."""
115 pass
116
117
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000118def stream_read(stream, chunk_size):
119 """Reads chunks from |stream| and yields them."""
120 while True:
121 data = stream.read(chunk_size)
122 if not data:
123 break
124 yield data
125
126
maruel12e30012015-10-09 11:55:35 -0700127def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800128 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700129 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800130 if offset:
131 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000133 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000134 if not data:
135 break
136 yield data
137
138
maruel12e30012015-10-09 11:55:35 -0700139def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000140 """Writes file content as generated by content_generator.
141
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000142 Creates the intermediary directory as needed.
143
144 Returns the number of bytes written.
145
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000146 Meant to be mocked out in unit tests.
147 """
maruel12e30012015-10-09 11:55:35 -0700148 filedir = os.path.dirname(path)
149 if not fs.isdir(filedir):
150 fs.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 total = 0
maruel12e30012015-10-09 11:55:35 -0700152 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000155 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000156 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000157
158
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000159def zip_compress(content_generator, level=7):
160 """Reads chunks from |content_generator| and yields zip compressed chunks."""
161 compressor = zlib.compressobj(level)
162 for chunk in content_generator:
163 compressed = compressor.compress(chunk)
164 if compressed:
165 yield compressed
166 tail = compressor.flush(zlib.Z_FINISH)
167 if tail:
168 yield tail
169
170
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400171def zip_decompress(
172 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000173 """Reads zipped data from |content_generator| and yields decompressed data.
174
175 Decompresses data in small chunks (no larger than |chunk_size|) so that
176 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
177
178 Raises IOError if data is corrupted or incomplete.
179 """
180 decompressor = zlib.decompressobj()
181 compressed_size = 0
182 try:
183 for chunk in content_generator:
184 compressed_size += len(chunk)
185 data = decompressor.decompress(chunk, chunk_size)
186 if data:
187 yield data
188 while decompressor.unconsumed_tail:
189 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
190 if data:
191 yield data
192 tail = decompressor.flush()
193 if tail:
194 yield tail
195 except zlib.error as e:
196 raise IOError(
197 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
198 # Ensure all data was read and decompressed.
199 if decompressor.unused_data or decompressor.unconsumed_tail:
200 raise IOError('Not all data was decompressed')
201
202
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000203def get_zip_compression_level(filename):
204 """Given a filename calculates the ideal zip compression level to use."""
205 file_ext = os.path.splitext(filename)[1].lower()
206 # TODO(csharp): Profile to find what compression level works best.
207 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
208
209
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000210def create_directories(base_directory, files):
211 """Creates the directory structure needed by the given list of files."""
212 logging.debug('create_directories(%s, %d)', base_directory, len(files))
213 # Creates the tree of directories to create.
214 directories = set(os.path.dirname(f) for f in files)
215 for item in list(directories):
216 while item:
217 directories.add(item)
218 item = os.path.dirname(item)
219 for d in sorted(directories):
220 if d:
maruel12e30012015-10-09 11:55:35 -0700221 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000222
223
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500224def create_symlinks(base_directory, files):
225 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000226 for filepath, properties in files:
227 if 'l' not in properties:
228 continue
229 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500230 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000231 logging.warning('Ignoring symlink %s', filepath)
232 continue
233 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500234 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000235 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000236
237
maruel12e30012015-10-09 11:55:35 -0700238def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000239 """Determines if the given files appears valid.
240
241 Currently it just checks the file's size.
242 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700243 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700244 return fs.isfile(path)
245 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000246 if size != actual_size:
247 logging.warning(
248 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700249 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000250 return False
251 return True
252
253
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000254class Item(object):
255 """An item to push to Storage.
256
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800257 Its digest and size may be provided in advance, if known. Otherwise they will
258 be derived from content(). If digest is provided, it MUST correspond to
259 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000260
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800261 When used with Storage, Item starts its life in a main thread, travels
262 to 'contains' thread, then to 'push' thread and then finally back to
263 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000264 """
265
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800266 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267 self.digest = digest
268 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800269 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000270 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000271
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800272 def content(self):
273 """Iterable with content of this item as byte string (str) chunks."""
274 raise NotImplementedError()
275
276 def prepare(self, hash_algo):
277 """Ensures self.digest and self.size are set.
278
279 Uses content() as a source of data to calculate them. Does nothing if digest
280 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000281
282 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800283 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000284 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800285 if self.digest is None or self.size is None:
286 digest = hash_algo()
287 total = 0
288 for chunk in self.content():
289 digest.update(chunk)
290 total += len(chunk)
291 self.digest = digest.hexdigest()
292 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000293
294
295class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800296 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000297
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800298 Its digest and size may be provided in advance, if known. Otherwise they will
299 be derived from the file content.
300 """
301
302 def __init__(self, path, digest=None, size=None, high_priority=False):
303 super(FileItem, self).__init__(
304 digest,
maruel12e30012015-10-09 11:55:35 -0700305 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800306 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000307 self.path = path
308 self.compression_level = get_zip_compression_level(path)
309
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800310 def content(self):
311 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000312
313
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000314class BufferItem(Item):
315 """A byte buffer to push to Storage."""
316
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 def __init__(self, buf, high_priority=False):
318 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000319 self.buffer = buf
320
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800321 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000322 return [self.buffer]
323
324
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000325class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800326 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000327
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800328 Implements compression support, parallel 'contains' checks, parallel uploads
329 and more.
330
331 Works only within single namespace (and thus hashing algorithm and compression
332 scheme are fixed).
333
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400334 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
335 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800336 """
337
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700338 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000339 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400340 self._use_zip = isolated_format.is_namespace_with_compression(
341 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400342 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000343 self._cpu_thread_pool = None
344 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400345 self._aborted = False
346 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000347
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000348 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700349 def hash_algo(self):
350 """Hashing algorithm used to name files in storage based on their content.
351
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400352 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700353 """
354 return self._hash_algo
355
356 @property
357 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500358 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700359 return self._storage_api.location
360
361 @property
362 def namespace(self):
363 """Isolate namespace used by this storage.
364
365 Indirectly defines hashing scheme and compression method used.
366 """
367 return self._storage_api.namespace
368
369 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000370 def cpu_thread_pool(self):
371 """ThreadPool for CPU-bound tasks like zipping."""
372 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500373 threads = max(threading_utils.num_processors(), 2)
374 if sys.maxsize <= 2L**32:
375 # On 32 bits userland, do not try to use more than 16 threads.
376 threads = min(threads, 16)
377 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000378 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000379
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000380 @property
381 def net_thread_pool(self):
382 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
383 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700384 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000385 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000386
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 def close(self):
388 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400389 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000390 if self._cpu_thread_pool:
391 self._cpu_thread_pool.join()
392 self._cpu_thread_pool.close()
393 self._cpu_thread_pool = None
394 if self._net_thread_pool:
395 self._net_thread_pool.join()
396 self._net_thread_pool.close()
397 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400398 logging.info('Done.')
399
400 def abort(self):
401 """Cancels any pending or future operations."""
402 # This is not strictly theadsafe, but in the worst case the logging message
403 # will be printed twice. Not a big deal. In other places it is assumed that
404 # unprotected reads and writes to _aborted are serializable (it is true
405 # for python) and thus no locking is used.
406 if not self._aborted:
407 logging.warning('Aborting... It can take a while.')
408 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000409
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000410 def __enter__(self):
411 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400412 assert not self._prev_sig_handlers, self._prev_sig_handlers
413 for s in (signal.SIGINT, signal.SIGTERM):
414 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 return self
416
417 def __exit__(self, _exc_type, _exc_value, _traceback):
418 """Context manager interface."""
419 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400420 while self._prev_sig_handlers:
421 s, h = self._prev_sig_handlers.popitem()
422 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000423 return False
424
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000425 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800426 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000427
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800428 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000429
430 Arguments:
431 items: list of Item instances that represents data to upload.
432
433 Returns:
434 List of items that were uploaded. All other items are already there.
435 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700436 logging.info('upload_items(items=%d)', len(items))
437
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800438 # Ensure all digests are calculated.
439 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700440 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800441
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000442 # For each digest keep only first Item that matches it. All other items
443 # are just indistinguishable copies from the point of view of isolate
444 # server (it doesn't care about paths at all, only content and digests).
445 seen = {}
446 duplicates = 0
447 for item in items:
448 if seen.setdefault(item.digest, item) is not item:
449 duplicates += 1
450 items = seen.values()
451 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700452 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000453
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000454 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000455 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000456 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800457 channel = threading_utils.TaskChannel()
458 for missing_item, push_state in self.get_missing_items(items):
459 missing.add(missing_item)
460 self.async_push(channel, missing_item, push_state)
461
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 # No need to spawn deadlock detector thread if there's nothing to upload.
463 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700464 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000466 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000467 detector.ping()
468 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000469 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000470 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000471 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000472 logging.info('All files are uploaded')
473
474 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000475 total = len(items)
476 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000477 logging.info(
478 'Total: %6d, %9.1fkb',
479 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000480 total_size / 1024.)
481 cache_hit = set(items) - missing
482 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000483 logging.info(
484 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
485 len(cache_hit),
486 cache_hit_size / 1024.,
487 len(cache_hit) * 100. / total,
488 cache_hit_size * 100. / total_size if total_size else 0)
489 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000490 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000491 logging.info(
492 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
493 len(cache_miss),
494 cache_miss_size / 1024.,
495 len(cache_miss) * 100. / total,
496 cache_miss_size * 100. / total_size if total_size else 0)
497
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000498 return uploaded
499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 def get_fetch_url(self, item):
501 """Returns an URL that can be used to fetch given item once it's uploaded.
502
503 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000504
505 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800506 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000507
508 Returns:
509 An URL or None if underlying protocol doesn't support this.
510 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700511 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800512 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000513
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800514 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000515 """Starts asynchronous push to the server in a parallel thread.
516
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800517 Can be used only after |item| was checked for presence on a server with
518 'get_missing_items' call. 'get_missing_items' returns |push_state| object
519 that contains storage specific information describing how to upload
520 the item (for example in case of cloud storage, it is signed upload URLs).
521
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000522 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000523 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000524 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800525 push_state: push state returned by 'get_missing_items' call for |item|.
526
527 Returns:
528 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000529 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800530 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400531 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700532 threading_utils.PRIORITY_HIGH if item.high_priority
533 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800534
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000535 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400536 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400537 if self._aborted:
538 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700539 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800540 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000541 return item
542
543 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700544 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800545 self.net_thread_pool.add_task_with_channel(
546 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000547 return
548
549 # If zipping is enabled, zip in a separate thread.
550 def zip_and_push():
551 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
552 # content right here. It will block until all file is zipped.
553 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400554 if self._aborted:
555 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800556 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000557 data = ''.join(stream)
558 except Exception as exc:
559 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800560 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000561 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000562 self.net_thread_pool.add_task_with_channel(
563 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000564 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000565
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800566 def push(self, item, push_state):
567 """Synchronously pushes a single item to the server.
568
569 If you need to push many items at once, consider using 'upload_items' or
570 'async_push' with instance of TaskChannel.
571
572 Arguments:
573 item: item to upload as instance of Item class.
574 push_state: push state returned by 'get_missing_items' call for |item|.
575
576 Returns:
577 Pushed item (same object as |item|).
578 """
579 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700580 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800581 self.async_push(channel, item, push_state)
582 pushed = channel.pull()
583 assert pushed is item
584 return item
585
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000586 def async_fetch(self, channel, priority, digest, size, sink):
587 """Starts asynchronous fetch from the server in a parallel thread.
588
589 Arguments:
590 channel: TaskChannel that receives back |digest| when download ends.
591 priority: thread pool task priority for the fetch.
592 digest: hex digest of an item to download.
593 size: expected size of the item (after decompression).
594 sink: function that will be called as sink(generator).
595 """
596 def fetch():
597 try:
598 # Prepare reading pipeline.
599 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700600 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400601 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000602 # Run |stream| through verifier that will assert its size.
603 verifier = FetchStreamVerifier(stream, size)
604 # Verified stream goes to |sink|.
605 sink(verifier.run())
606 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800607 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000608 raise
609 return digest
610
611 # Don't bother with zip_thread_pool for decompression. Decompression is
612 # really fast and most probably IO bound anyway.
613 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
614
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000615 def get_missing_items(self, items):
616 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000617
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000618 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000619
620 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000621 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000622
623 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624 For each missing item it yields a pair (item, push_state), where:
625 * item - Item object that is missing (one of |items|).
626 * push_state - opaque object that contains storage specific information
627 describing how to upload the item (for example in case of cloud
628 storage, it is signed upload URLs). It can later be passed to
629 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000630 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000631 channel = threading_utils.TaskChannel()
632 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800633
634 # Ensure all digests are calculated.
635 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700636 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800637
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400638 def contains(batch):
639 if self._aborted:
640 raise Aborted()
641 return self._storage_api.contains(batch)
642
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000643 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800644 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400645 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400646 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000647 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800648
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000649 # Yield results as they come in.
650 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800651 for missing_item, push_state in channel.pull().iteritems():
652 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000653
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000654
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800655def batch_items_for_check(items):
656 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000657
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800658 Each batch corresponds to a single 'exists?' query to the server via a call
659 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000660
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800661 Arguments:
662 items: a list of Item objects.
663
664 Yields:
665 Batches of items to query for existence in a single operation,
666 each batch is a list of Item objects.
667 """
668 batch_count = 0
669 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
670 next_queries = []
671 for item in sorted(items, key=lambda x: x.size, reverse=True):
672 next_queries.append(item)
673 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000674 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800675 next_queries = []
676 batch_count += 1
677 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
678 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
679 if next_queries:
680 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000681
682
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000683class FetchQueue(object):
684 """Fetches items from Storage and places them into LocalCache.
685
686 It manages multiple concurrent fetch operations. Acts as a bridge between
687 Storage and LocalCache so that Storage and LocalCache don't depend on each
688 other at all.
689 """
690
691 def __init__(self, storage, cache):
692 self.storage = storage
693 self.cache = cache
694 self._channel = threading_utils.TaskChannel()
695 self._pending = set()
696 self._accessed = set()
697 self._fetched = cache.cached_set()
698
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400699 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700700 self,
701 digest,
702 size=UNKNOWN_FILE_SIZE,
703 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000704 """Starts asynchronous fetch of item |digest|."""
705 # Fetching it now?
706 if digest in self._pending:
707 return
708
709 # Mark this file as in use, verify_all_cached will later ensure it is still
710 # in cache.
711 self._accessed.add(digest)
712
713 # Already fetched? Notify cache to update item's LRU position.
714 if digest in self._fetched:
715 # 'touch' returns True if item is in cache and not corrupted.
716 if self.cache.touch(digest, size):
717 return
718 # Item is corrupted, remove it from cache and fetch it again.
719 self._fetched.remove(digest)
720 self.cache.evict(digest)
721
722 # TODO(maruel): It should look at the free disk space, the current cache
723 # size and the size of the new item on every new item:
724 # - Trim the cache as more entries are listed when free disk space is low,
725 # otherwise if the amount of data downloaded during the run > free disk
726 # space, it'll crash.
727 # - Make sure there's enough free disk space to fit all dependencies of
728 # this run! If not, abort early.
729
730 # Start fetching.
731 self._pending.add(digest)
732 self.storage.async_fetch(
733 self._channel, priority, digest, size,
734 functools.partial(self.cache.write, digest))
735
736 def wait(self, digests):
737 """Starts a loop that waits for at least one of |digests| to be retrieved.
738
739 Returns the first digest retrieved.
740 """
741 # Flush any already fetched items.
742 for digest in digests:
743 if digest in self._fetched:
744 return digest
745
746 # Ensure all requested items are being fetched now.
747 assert all(digest in self._pending for digest in digests), (
748 digests, self._pending)
749
750 # Wait for some requested item to finish fetching.
751 while self._pending:
752 digest = self._channel.pull()
753 self._pending.remove(digest)
754 self._fetched.add(digest)
755 if digest in digests:
756 return digest
757
758 # Should never reach this point due to assert above.
759 raise RuntimeError('Impossible state')
760
761 def inject_local_file(self, path, algo):
762 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700763 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000764 data = f.read()
765 digest = algo(data).hexdigest()
766 self.cache.write(digest, [data])
767 self._fetched.add(digest)
768 return digest
769
770 @property
771 def pending_count(self):
772 """Returns number of items to be fetched."""
773 return len(self._pending)
774
775 def verify_all_cached(self):
776 """True if all accessed items are in cache."""
777 return self._accessed.issubset(self.cache.cached_set())
778
779
780class FetchStreamVerifier(object):
781 """Verifies that fetched file is valid before passing it to the LocalCache."""
782
783 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400784 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000785 self.stream = stream
786 self.expected_size = expected_size
787 self.current_size = 0
788
789 def run(self):
790 """Generator that yields same items as |stream|.
791
792 Verifies |stream| is complete before yielding a last chunk to consumer.
793
794 Also wraps IOError produced by consumer into MappingError exceptions since
795 otherwise Storage will retry fetch on unrelated local cache errors.
796 """
797 # Read one chunk ahead, keep it in |stored|.
798 # That way a complete stream can be verified before pushing last chunk
799 # to consumer.
800 stored = None
801 for chunk in self.stream:
802 assert chunk is not None
803 if stored is not None:
804 self._inspect_chunk(stored, is_last=False)
805 try:
806 yield stored
807 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400808 raise isolated_format.MappingError(
809 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000810 stored = chunk
811 if stored is not None:
812 self._inspect_chunk(stored, is_last=True)
813 try:
814 yield stored
815 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400816 raise isolated_format.MappingError(
817 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000818
819 def _inspect_chunk(self, chunk, is_last):
820 """Called for each fetched chunk before passing it to consumer."""
821 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400822 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700823 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000824 (self.expected_size != self.current_size)):
825 raise IOError('Incorrect file size: expected %d, got %d' % (
826 self.expected_size, self.current_size))
827
828
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000829class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800830 """Interface for classes that implement low-level storage operations.
831
832 StorageApi is oblivious of compression and hashing scheme used. This details
833 are handled in higher level Storage class.
834
835 Clients should generally not use StorageApi directly. Storage class is
836 preferred since it implements compression and upload optimizations.
837 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000838
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700839 @property
840 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500841 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700842 raise NotImplementedError()
843
844 @property
845 def namespace(self):
846 """Isolate namespace used by this storage.
847
848 Indirectly defines hashing scheme and compression method used.
849 """
850 raise NotImplementedError()
851
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000852 def get_fetch_url(self, digest):
853 """Returns an URL that can be used to fetch an item with given digest.
854
855 Arguments:
856 digest: hex digest of item to fetch.
857
858 Returns:
859 An URL or None if the protocol doesn't support this.
860 """
861 raise NotImplementedError()
862
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800863 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000864 """Fetches an object and yields its content.
865
866 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000867 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800868 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000869
870 Yields:
871 Chunks of downloaded item (as str objects).
872 """
873 raise NotImplementedError()
874
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800875 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000876 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000877
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800878 |item| MUST go through 'contains' call to get |push_state| before it can
879 be pushed to the storage.
880
881 To be clear, here is one possible usage:
882 all_items = [... all items to push as Item subclasses ...]
883 for missing_item, push_state in storage_api.contains(all_items).items():
884 storage_api.push(missing_item, push_state)
885
886 When pushing to a namespace with compression, data that should be pushed
887 and data provided by the item is not the same. In that case |content| is
888 not None and it yields chunks of compressed data (using item.content() as
889 a source of original uncompressed data). This is implemented by Storage
890 class.
891
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000892 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000893 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800894 push_state: push state object as returned by 'contains' call.
895 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000896
897 Returns:
898 None.
899 """
900 raise NotImplementedError()
901
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000902 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800903 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000904
905 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800906 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000907
908 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800909 A dict missing Item -> opaque push state object to be passed to 'push'.
910 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000911 """
912 raise NotImplementedError()
913
914
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800915class _IsolateServerPushState(object):
916 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500917
918 Note this needs to be a global class to support pickling.
919 """
920
Cory Massarocc19c8c2015-03-10 13:35:11 -0700921 def __init__(self, preupload_status, size):
922 self.preupload_status = preupload_status
923 gs_upload_url = preupload_status.get('gs_upload_url') or None
924 if gs_upload_url:
925 self.upload_url = gs_upload_url
926 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
927 else:
928 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
929 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500930 self.uploaded = False
931 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500932 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500933
934
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000935class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000936 """StorageApi implementation that downloads and uploads to Isolate Server.
937
938 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800939 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000940 """
941
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000942 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000943 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500944 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700945 self._base_url = base_url.rstrip('/')
946 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700947 self._namespace_dict = {
948 'compression': 'flate' if namespace.endswith(
949 ('-gzip', '-flate')) else '',
950 'digest_hash': 'sha-1',
951 'namespace': namespace,
952 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000953 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000954 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500955 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000956
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000957 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000958 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700959 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000960
961 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700962 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000963 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000964 # TODO(maruel): Make this request much earlier asynchronously while the
965 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800966
967 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
968 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700969
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000970 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000971 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700972 self._server_caps = net.url_read_json(
973 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
974 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000975 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000976
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700977 @property
978 def location(self):
979 return self._base_url
980
981 @property
982 def namespace(self):
983 return self._namespace
984
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000985 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000986 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000987 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700988 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000989
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800990 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700991 assert offset >= 0
992 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
993 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800994 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700995 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000996
Cory Massarocc19c8c2015-03-10 13:35:11 -0700997 if not response:
maruele154f9c2015-09-14 11:03:15 -0700998 raise IOError(
999 'Attempted to fetch from %s; no data exist: %s / %s.' % (
1000 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001001
Cory Massarocc19c8c2015-03-10 13:35:11 -07001002 # for DB uploads
1003 content = response.get('content')
1004 if content is not None:
1005 return base64.b64decode(content)
1006
1007 # for GS entities
1008 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -07001009 if not connection:
1010 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -07001011
1012 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001013 if offset:
1014 content_range = connection.get_header('Content-Range')
1015 if not content_range:
1016 raise IOError('Missing Content-Range header')
1017
1018 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1019 # According to a spec, <size> can be '*' meaning "Total size of the file
1020 # is not known in advance".
1021 try:
1022 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1023 if not match:
1024 raise ValueError()
1025 content_offset = int(match.group(1))
1026 last_byte_index = int(match.group(2))
1027 size = None if match.group(3) == '*' else int(match.group(3))
1028 except ValueError:
1029 raise IOError('Invalid Content-Range header: %s' % content_range)
1030
1031 # Ensure returned offset equals requested one.
1032 if offset != content_offset:
1033 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1034 offset, content_offset, content_range))
1035
1036 # Ensure entire tail of the file is returned.
1037 if size is not None and last_byte_index + 1 != size:
1038 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1039
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001040 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001041
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001042 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001043 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001044 assert item.digest is not None
1045 assert item.size is not None
1046 assert isinstance(push_state, _IsolateServerPushState)
1047 assert not push_state.finalized
1048
1049 # Default to item.content().
1050 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001051 logging.info('Push state size: %d', push_state.size)
1052 if isinstance(content, (basestring, list)):
1053 # Memory is already used, too late.
1054 with self._lock:
1055 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001056 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001057 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1058 # If |content| is indeed a generator, it can not be re-winded back to the
1059 # beginning of the stream. A retry will find it exhausted. A possible
1060 # solution is to wrap |content| generator with some sort of caching
1061 # restartable generator. It should be done alongside streaming support
1062 # implementation.
1063 #
1064 # In theory, we should keep the generator, so that it is not serialized in
1065 # memory. Sadly net.HttpService.request() requires the body to be
1066 # serialized.
1067 assert isinstance(content, types.GeneratorType), repr(content)
1068 slept = False
1069 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001070 # One byte less than 512mb. This is to cope with incompressible content.
1071 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001072 while True:
1073 with self._lock:
1074 # This is due to 32 bits python when uploading very large files. The
1075 # problem is that it's comparing uncompressed sizes, while we care
1076 # about compressed sizes since it's what is serialized in memory.
1077 # The first check assumes large files are compressible and that by
1078 # throttling one upload at once, we can survive. Otherwise, kaboom.
1079 memory_use = self._memory_use
1080 if ((push_state.size >= max_size and not memory_use) or
1081 (memory_use + push_state.size <= max_size)):
1082 self._memory_use += push_state.size
1083 memory_use = self._memory_use
1084 break
1085 time.sleep(0.1)
1086 slept = True
1087 if slept:
1088 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001089
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001090 try:
1091 # This push operation may be a retry after failed finalization call below,
1092 # no need to reupload contents in that case.
1093 if not push_state.uploaded:
1094 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001095 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001096 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001097 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001098 item.digest, push_state.upload_url))
1099 push_state.uploaded = True
1100 else:
1101 logging.info(
1102 'A file %s already uploaded, retrying finalization only',
1103 item.digest)
1104
1105 # Optionally notify the server that it's done.
1106 if push_state.finalize_url:
1107 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1108 # send it to isolated server. That way isolate server can verify that
1109 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1110 # stored files).
1111 # TODO(maruel): Fix the server to accept properly data={} so
1112 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001113 response = net.url_read_json(
1114 url='%s/%s' % (self._base_url, push_state.finalize_url),
1115 data={
1116 'upload_ticket': push_state.preupload_status['upload_ticket'],
1117 })
1118 if not response or not response['ok']:
1119 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001120 push_state.finalized = True
1121 finally:
1122 with self._lock:
1123 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001124
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001125 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001126 # Ensure all items were initialized with 'prepare' call. Storage does that.
1127 assert all(i.digest is not None and i.size is not None for i in items)
1128
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001129 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001130 body = {
1131 'items': [
1132 {
1133 'digest': item.digest,
1134 'is_isolated': bool(item.high_priority),
1135 'size': item.size,
1136 } for item in items
1137 ],
1138 'namespace': self._namespace_dict,
1139 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001140
Cory Massarocc19c8c2015-03-10 13:35:11 -07001141 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001142
1143 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001144 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001145 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001146 response = net.url_read_json(url=query_url, data=body)
1147 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001148 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001149 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001150 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001151 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001152 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001153
1154 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001155 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001156 for preupload_status in response.get('items', []):
1157 assert 'upload_ticket' in preupload_status, (
1158 preupload_status, '/preupload did not generate an upload ticket')
1159 index = int(preupload_status['index'])
1160 missing_items[items[index]] = _IsolateServerPushState(
1161 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001162 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001163 len(items), len(items) - len(missing_items))
1164 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001165
Cory Massarocc19c8c2015-03-10 13:35:11 -07001166 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001167 """Fetches isolated data from the URL.
1168
1169 Used only for fetching files, not for API calls. Can be overridden in
1170 subclasses.
1171
1172 Args:
1173 url: URL to fetch the data from, can possibly return http redirect.
1174 offset: byte offset inside the file to start fetching from.
1175
1176 Returns:
1177 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1178 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001179 assert isinstance(offset, int)
1180 data = {
1181 'digest': digest.encode('utf-8'),
1182 'namespace': self._namespace_dict,
1183 'offset': offset,
1184 }
1185 return net.url_read_json(
1186 url=url,
1187 data=data,
1188 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001189
Cory Massarocc19c8c2015-03-10 13:35:11 -07001190 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001191 """Uploads isolated file to the URL.
1192
1193 Used only for storing files, not for API calls. Can be overridden in
1194 subclasses.
1195
1196 Args:
1197 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001198 push_state: an _IsolateServicePushState instance
1199 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001200 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001201 """
1202 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1203 # upload support is implemented.
1204 if isinstance(content, list) and len(content) == 1:
1205 content = content[0]
1206 else:
1207 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001208
1209 # DB upload
1210 if not push_state.finalize_url:
1211 url = '%s/%s' % (self._base_url, push_state.upload_url)
1212 content = base64.b64encode(content)
1213 data = {
1214 'upload_ticket': push_state.preupload_status['upload_ticket'],
1215 'content': content,
1216 }
1217 response = net.url_read_json(url=url, data=data)
1218 return response is not None and response['ok']
1219
1220 # upload to GS
1221 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001222 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001223 content_type='application/octet-stream',
1224 data=content,
1225 method='PUT',
1226 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001227 return response is not None
1228
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001229
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001230class LocalCache(object):
1231 """Local cache that stores objects fetched via Storage.
1232
1233 It can be accessed concurrently from multiple threads, so it should protect
1234 its internal state with some lock.
1235 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001236 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001237
1238 def __enter__(self):
1239 """Context manager interface."""
1240 return self
1241
1242 def __exit__(self, _exc_type, _exec_value, _traceback):
1243 """Context manager interface."""
1244 return False
1245
1246 def cached_set(self):
1247 """Returns a set of all cached digests (always a new object)."""
1248 raise NotImplementedError()
1249
1250 def touch(self, digest, size):
1251 """Ensures item is not corrupted and updates its LRU position.
1252
1253 Arguments:
1254 digest: hash digest of item to check.
1255 size: expected size of this item.
1256
1257 Returns:
1258 True if item is in cache and not corrupted.
1259 """
1260 raise NotImplementedError()
1261
1262 def evict(self, digest):
1263 """Removes item from cache if it's there."""
1264 raise NotImplementedError()
1265
1266 def read(self, digest):
1267 """Returns contents of the cached item as a single str."""
1268 raise NotImplementedError()
1269
1270 def write(self, digest, content):
1271 """Reads data from |content| generator and stores it in cache."""
1272 raise NotImplementedError()
1273
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001274 def hardlink(self, digest, dest, file_mode):
1275 """Ensures file at |dest| has same content as cached |digest|.
1276
1277 If file_mode is provided, it is used to set the executable bit if
1278 applicable.
1279 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001280 raise NotImplementedError()
1281
1282
1283class MemoryCache(LocalCache):
1284 """LocalCache implementation that stores everything in memory."""
1285
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001286 def __init__(self, file_mode_mask=0500):
1287 """Args:
1288 file_mode_mask: bit mask to AND file mode with. Default value will make
1289 all mapped files to be read only.
1290 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001291 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001292 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001293 # Let's not assume dict is thread safe.
1294 self._lock = threading.Lock()
1295 self._contents = {}
1296
1297 def cached_set(self):
1298 with self._lock:
1299 return set(self._contents)
1300
1301 def touch(self, digest, size):
1302 with self._lock:
1303 return digest in self._contents
1304
1305 def evict(self, digest):
1306 with self._lock:
1307 self._contents.pop(digest, None)
1308
1309 def read(self, digest):
1310 with self._lock:
1311 return self._contents[digest]
1312
1313 def write(self, digest, content):
1314 # Assemble whole stream before taking the lock.
1315 data = ''.join(content)
1316 with self._lock:
1317 self._contents[digest] = data
1318
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001319 def hardlink(self, digest, dest, file_mode):
1320 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001321 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001322 if file_mode is not None:
maruel12e30012015-10-09 11:55:35 -07001323 fs.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001324
1325
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001326class CachePolicies(object):
1327 def __init__(self, max_cache_size, min_free_space, max_items):
1328 """
1329 Arguments:
1330 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1331 cache is effectively a leak.
1332 - min_free_space: Trim if disk free space becomes lower than this value. If
1333 0, it unconditionally fill the disk.
1334 - max_items: Maximum number of items to keep in the cache. If 0, do not
1335 enforce a limit.
1336 """
1337 self.max_cache_size = max_cache_size
1338 self.min_free_space = min_free_space
1339 self.max_items = max_items
1340
1341
1342class DiskCache(LocalCache):
1343 """Stateful LRU cache in a flat hash table in a directory.
1344
1345 Saves its state as json file.
1346 """
maruel12e30012015-10-09 11:55:35 -07001347 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001348
1349 def __init__(self, cache_dir, policies, hash_algo):
1350 """
1351 Arguments:
1352 cache_dir: directory where to place the cache.
1353 policies: cache retention policies.
1354 algo: hashing algorithm used.
1355 """
1356 super(DiskCache, self).__init__()
1357 self.cache_dir = cache_dir
1358 self.policies = policies
1359 self.hash_algo = hash_algo
1360 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1361
1362 # All protected methods (starting with '_') except _path should be called
1363 # with this lock locked.
1364 self._lock = threading_utils.LockWithAssert()
1365 self._lru = lru.LRUDict()
1366
1367 # Profiling values.
1368 self._added = []
1369 self._removed = []
1370 self._free_disk = 0
1371
1372 with tools.Profiler('Setup'):
1373 with self._lock:
1374 self._load()
1375
1376 def __enter__(self):
1377 return self
1378
1379 def __exit__(self, _exc_type, _exec_value, _traceback):
1380 with tools.Profiler('CleanupTrimming'):
1381 with self._lock:
1382 self._trim()
1383
1384 logging.info(
1385 '%5d (%8dkb) added',
1386 len(self._added), sum(self._added) / 1024)
1387 logging.info(
1388 '%5d (%8dkb) current',
1389 len(self._lru),
1390 sum(self._lru.itervalues()) / 1024)
1391 logging.info(
1392 '%5d (%8dkb) removed',
1393 len(self._removed), sum(self._removed) / 1024)
1394 logging.info(
1395 ' %8dkb free',
1396 self._free_disk / 1024)
1397 return False
1398
1399 def cached_set(self):
1400 with self._lock:
1401 return self._lru.keys_set()
1402
1403 def touch(self, digest, size):
1404 """Verifies an actual file is valid.
1405
1406 Note that is doesn't compute the hash so it could still be corrupted if the
1407 file size didn't change.
1408
1409 TODO(maruel): More stringent verification while keeping the check fast.
1410 """
1411 # Do the check outside the lock.
1412 if not is_valid_file(self._path(digest), size):
1413 return False
1414
1415 # Update it's LRU position.
1416 with self._lock:
1417 if digest not in self._lru:
1418 return False
1419 self._lru.touch(digest)
1420 return True
1421
1422 def evict(self, digest):
1423 with self._lock:
1424 self._lru.pop(digest)
1425 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1426
1427 def read(self, digest):
maruel12e30012015-10-09 11:55:35 -07001428 with fs.open(self._path(digest), 'rb') as f:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001429 return f.read()
1430
1431 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001432 assert content is not None
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001433 path = self._path(digest)
1434 # A stale broken file may remain. It is possible for the file to have write
1435 # access bit removed which would cause the file_write() call to fail to open
1436 # in write mode. Take no chance here.
1437 file_path.try_remove(path)
1438 try:
1439 size = file_write(path, content)
1440 except:
1441 # There are two possible places were an exception can occur:
1442 # 1) Inside |content| generator in case of network or unzipping errors.
1443 # 2) Inside file_write itself in case of disk IO errors.
1444 # In any case delete an incomplete file and propagate the exception to
1445 # caller, it will be logged there.
1446 file_path.try_remove(path)
1447 raise
1448 # Make the file read-only in the cache. This has a few side-effects since
1449 # the file node is modified, so every directory entries to this file becomes
1450 # read-only. It's fine here because it is a new file.
1451 file_path.set_read_only(path, True)
1452 with self._lock:
1453 self._add(digest, size)
1454
1455 def hardlink(self, digest, dest, file_mode):
1456 """Hardlinks the file to |dest|.
1457
1458 Note that the file permission bits are on the file node, not the directory
1459 entry, so changing the access bit on any of the directory entries for the
1460 file node will affect them all.
1461 """
1462 path = self._path(digest)
maruel1f7e8162015-09-16 10:35:43 -07001463 if not file_path.link_file(dest, path, file_path.HARDLINK_WITH_FALLBACK):
1464 # Report to the server that it failed with more details. We'll want to
1465 # squash them all.
1466 on_error.report('Failed to hardlink\n%s -> %s' % (path, dest))
1467
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001468 if file_mode is not None:
1469 # Ignores all other bits.
maruel12e30012015-10-09 11:55:35 -07001470 fs.chmod(dest, file_mode & 0500)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001471
1472 def _load(self):
1473 """Loads state of the cache from json file."""
1474 self._lock.assert_locked()
1475
1476 if not os.path.isdir(self.cache_dir):
maruel12e30012015-10-09 11:55:35 -07001477 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001478 else:
1479 # Make sure the cache is read-only.
1480 # TODO(maruel): Calculate the cost and optimize the performance
1481 # accordingly.
1482 file_path.make_tree_read_only(self.cache_dir)
1483
1484 # Load state of the cache.
maruel12e30012015-10-09 11:55:35 -07001485 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001486 try:
1487 self._lru = lru.LRUDict.load(self.state_file)
1488 except ValueError as err:
1489 logging.error('Failed to load cache state: %s' % (err,))
1490 # Don't want to keep broken state file.
1491 file_path.try_remove(self.state_file)
1492
1493 # Ensure that all files listed in the state still exist and add new ones.
1494 previous = self._lru.keys_set()
1495 unknown = []
maruel12e30012015-10-09 11:55:35 -07001496 for filename in fs.listdir(self.cache_dir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001497 if filename == self.STATE_FILE:
1498 continue
1499 if filename in previous:
1500 previous.remove(filename)
1501 continue
1502 # An untracked file.
1503 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1504 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001505 p = self._path(filename)
maruel12e30012015-10-09 11:55:35 -07001506 if fs.isdir(p):
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001507 try:
1508 file_path.rmtree(p)
1509 except OSError:
1510 pass
1511 else:
1512 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001513 continue
1514 # File that's not referenced in 'state.json'.
1515 # TODO(vadimsh): Verify its SHA1 matches file name.
1516 logging.warning('Adding unknown file %s to cache', filename)
1517 unknown.append(filename)
1518
1519 if unknown:
1520 # Add as oldest files. They will be deleted eventually if not accessed.
1521 self._add_oldest_list(unknown)
1522 logging.warning('Added back %d unknown files', len(unknown))
1523
1524 if previous:
1525 # Filter out entries that were not found.
1526 logging.warning('Removed %d lost files', len(previous))
1527 for filename in previous:
1528 self._lru.pop(filename)
1529 self._trim()
1530
1531 def _save(self):
1532 """Saves the LRU ordering."""
1533 self._lock.assert_locked()
1534 if sys.platform != 'win32':
1535 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001536 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001537 # Necessary otherwise the file can't be created.
1538 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001539 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001540 file_path.set_read_only(self.state_file, False)
1541 self._lru.save(self.state_file)
1542
1543 def _trim(self):
1544 """Trims anything we don't know, make sure enough free space exists."""
1545 self._lock.assert_locked()
1546
1547 # Ensure maximum cache size.
1548 if self.policies.max_cache_size:
1549 total_size = sum(self._lru.itervalues())
1550 while total_size > self.policies.max_cache_size:
1551 total_size -= self._remove_lru_file()
1552
1553 # Ensure maximum number of items in the cache.
1554 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1555 for _ in xrange(len(self._lru) - self.policies.max_items):
1556 self._remove_lru_file()
1557
1558 # Ensure enough free space.
1559 self._free_disk = file_path.get_free_space(self.cache_dir)
1560 trimmed_due_to_space = False
1561 while (
1562 self.policies.min_free_space and
1563 self._lru and
1564 self._free_disk < self.policies.min_free_space):
1565 trimmed_due_to_space = True
1566 self._remove_lru_file()
1567 self._free_disk = file_path.get_free_space(self.cache_dir)
1568 if trimmed_due_to_space:
1569 total_usage = sum(self._lru.itervalues())
1570 usage_percent = 0.
1571 if total_usage:
1572 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1573 logging.warning(
1574 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1575 'cache (%.1f%% of its maximum capacity)',
1576 self._free_disk / 1024.,
1577 total_usage / 1024.,
1578 usage_percent)
1579 self._save()
1580
1581 def _path(self, digest):
1582 """Returns the path to one item."""
1583 return os.path.join(self.cache_dir, digest)
1584
1585 def _remove_lru_file(self):
1586 """Removes the last recently used file and returns its size."""
1587 self._lock.assert_locked()
1588 digest, size = self._lru.pop_oldest()
1589 self._delete_file(digest, size)
1590 return size
1591
1592 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1593 """Adds an item into LRU cache marking it as a newest one."""
1594 self._lock.assert_locked()
1595 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001596 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001597 self._added.append(size)
1598 self._lru.add(digest, size)
1599
1600 def _add_oldest_list(self, digests):
1601 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1602 self._lock.assert_locked()
1603 pairs = []
1604 for digest in digests:
maruel12e30012015-10-09 11:55:35 -07001605 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001606 self._added.append(size)
1607 pairs.append((digest, size))
1608 self._lru.batch_insert_oldest(pairs)
1609
1610 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1611 """Deletes cache file from the file system."""
1612 self._lock.assert_locked()
1613 try:
1614 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001615 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001616 file_path.try_remove(self._path(digest))
1617 self._removed.append(size)
1618 except OSError as e:
1619 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1620
1621
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001622class IsolatedBundle(object):
1623 """Fetched and parsed .isolated file with all dependencies."""
1624
Vadim Shtayura3148e072014-09-02 18:51:52 -07001625 def __init__(self):
1626 self.command = []
1627 self.files = {}
1628 self.read_only = None
1629 self.relative_cwd = None
1630 # The main .isolated file, a IsolatedFile instance.
1631 self.root = None
1632
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001633 def fetch(self, fetch_queue, root_isolated_hash, algo):
1634 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001635
1636 It enables support for "included" .isolated files. They are processed in
1637 strict order but fetched asynchronously from the cache. This is important so
1638 that a file in an included .isolated file that is overridden by an embedding
1639 .isolated file is not fetched needlessly. The includes are fetched in one
1640 pass and the files are fetched as soon as all the ones on the left-side
1641 of the tree were fetched.
1642
1643 The prioritization is very important here for nested .isolated files.
1644 'includes' have the highest priority and the algorithm is optimized for both
1645 deep and wide trees. A deep one is a long link of .isolated files referenced
1646 one at a time by one item in 'includes'. A wide one has a large number of
1647 'includes' in a single .isolated file. 'left' is defined as an included
1648 .isolated file earlier in the 'includes' list. So the order of the elements
1649 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001650
1651 As a side effect this method starts asynchronous fetch of all data files
1652 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1653 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001654 """
1655 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1656
1657 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1658 pending = {}
1659 # Set of hashes of already retrieved items to refuse recursive includes.
1660 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001661 # Set of IsolatedFile's whose data files have already being fetched.
1662 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001663
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001664 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001665 h = isolated_file.obj_hash
1666 if h in seen:
1667 raise isolated_format.IsolatedError(
1668 'IsolatedFile %s is retrieved recursively' % h)
1669 assert h not in pending
1670 seen.add(h)
1671 pending[h] = isolated_file
1672 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1673
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001674 # Start fetching root *.isolated file (single file, not the whole bundle).
1675 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001676
1677 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001678 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001679 item_hash = fetch_queue.wait(pending)
1680 item = pending.pop(item_hash)
1681 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001682
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001683 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001684 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001685 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001686
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001687 # Always fetch *.isolated files in traversal order, waiting if necessary
1688 # until next to-be-processed node loads. "Waiting" is done by yielding
1689 # back to the outer loop, that waits until some *.isolated is loaded.
1690 for node in isolated_format.walk_includes(self.root):
1691 if node not in processed:
1692 # Not visited, and not yet loaded -> wait for it to load.
1693 if not node.is_loaded:
1694 break
1695 # Not visited and loaded -> process it and continue the traversal.
1696 self._start_fetching_files(node, fetch_queue)
1697 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001698
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001699 # All *.isolated files should be processed by now and only them.
1700 all_isolateds = set(isolated_format.walk_includes(self.root))
1701 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001702
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001703 # Extract 'command' and other bundle properties.
1704 for node in isolated_format.walk_includes(self.root):
1705 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001706 self.relative_cwd = self.relative_cwd or ''
1707
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001708 def _start_fetching_files(self, isolated, fetch_queue):
1709 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001710
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001711 Modifies self.files.
1712 """
1713 logging.debug('fetch_files(%s)', isolated.obj_hash)
1714 for filepath, properties in isolated.data.get('files', {}).iteritems():
1715 # Root isolated has priority on the files being mapped. In particular,
1716 # overridden files must not be fetched.
1717 if filepath not in self.files:
1718 self.files[filepath] = properties
1719 if 'h' in properties:
1720 # Preemptively request files.
1721 logging.debug('fetching %s', filepath)
1722 fetch_queue.add(
1723 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1724
1725 def _update_self(self, node):
1726 """Extracts bundle global parameters from loaded *.isolated file.
1727
1728 Will be called with each loaded *.isolated file in order of traversal of
1729 isolated include graph (see isolated_format.walk_includes).
1730 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001731 # Grabs properties.
1732 if not self.command and node.data.get('command'):
1733 # Ensure paths are correctly separated on windows.
1734 self.command = node.data['command']
1735 if self.command:
1736 self.command[0] = self.command[0].replace('/', os.path.sep)
1737 self.command = tools.fix_python_path(self.command)
1738 if self.read_only is None and node.data.get('read_only') is not None:
1739 self.read_only = node.data['read_only']
1740 if (self.relative_cwd is None and
1741 node.data.get('relative_cwd') is not None):
1742 self.relative_cwd = node.data['relative_cwd']
1743
1744
Vadim Shtayura8623c272014-12-01 11:45:27 -08001745def set_storage_api_class(cls):
1746 """Replaces StorageApi implementation used by default."""
1747 global _storage_api_cls
1748 assert _storage_api_cls is None
1749 assert issubclass(cls, StorageApi)
1750 _storage_api_cls = cls
1751
1752
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001753def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001754 """Returns an object that implements low-level StorageApi interface.
1755
1756 It is used by Storage to work with single isolate |namespace|. It should
1757 rarely be used directly by clients, see 'get_storage' for
1758 a better alternative.
1759
1760 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001761 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001762 namespace: isolate namespace to operate in, also defines hashing and
1763 compression scheme used, i.e. namespace names that end with '-gzip'
1764 store compressed data.
1765
1766 Returns:
1767 Instance of StorageApi subclass.
1768 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001769 cls = _storage_api_cls or IsolateServer
1770 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001771
1772
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001773def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001774 """Returns Storage class that can upload and download from |namespace|.
1775
1776 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001777 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001778 namespace: isolate namespace to operate in, also defines hashing and
1779 compression scheme used, i.e. namespace names that end with '-gzip'
1780 store compressed data.
1781
1782 Returns:
1783 Instance of Storage.
1784 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001785 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001786
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001787
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001788def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001789 """Uploads the given tree to the given url.
1790
1791 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001792 base_url: The url of the isolate server to upload to.
1793 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001794 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001795 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001796 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001797 # Filter out symlinks, since they are not represented by items on isolate
1798 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001799 items = []
1800 seen = set()
1801 skipped = 0
1802 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001803 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001804 if 'l' not in metadata and filepath not in seen:
1805 seen.add(filepath)
1806 item = FileItem(
1807 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001808 digest=metadata['h'],
1809 size=metadata['s'],
1810 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001811 items.append(item)
1812 else:
1813 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001814
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001815 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001816 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001817 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001818
1819
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001820def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001821 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001822
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001823 Arguments:
1824 isolated_hash: hash of the root *.isolated file.
1825 storage: Storage class that communicates with isolate storage.
1826 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001827 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001828 require_command: Ensure *.isolated specifies a command to run.
1829
1830 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001831 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001832 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001833 logging.debug(
1834 'fetch_isolated(%s, %s, %s, %s, %s)',
1835 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001836 # Hash algorithm to use, defined by namespace |storage| is using.
1837 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001838 with cache:
1839 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001840 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001841
1842 with tools.Profiler('GetIsolateds'):
1843 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001844 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001845 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001846 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001847 try:
maruel1ceb3872015-10-14 06:10:44 -07001848 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001849 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001850 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001851 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1852 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001853
1854 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001855 bundle.fetch(fetch_queue, isolated_hash, algo)
1856 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001857 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1858 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001859 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001860
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001861 with tools.Profiler('GetRest'):
1862 # Create file system hierarchy.
maruel12e30012015-10-09 11:55:35 -07001863 if not fs.isdir(outdir):
1864 fs.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001865 create_directories(outdir, bundle.files)
1866 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001867
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001868 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001869 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
maruel12e30012015-10-09 11:55:35 -07001870 if not fs.isdir(cwd):
1871 fs.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001872
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001873 # Multimap: digest -> list of pairs (path, props).
1874 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001875 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001876 if 'h' in props:
1877 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001878
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001879 # Now block on the remaining files to be downloaded and mapped.
1880 logging.info('Retrieving remaining files (%d of them)...',
1881 fetch_queue.pending_count)
1882 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001883 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001884 while remaining:
1885 detector.ping()
1886
1887 # Wait for any item to finish fetching to cache.
1888 digest = fetch_queue.wait(remaining)
1889
1890 # Link corresponding files to a fetched item in cache.
1891 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001892 cache.hardlink(
1893 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001894
1895 # Report progress.
1896 duration = time.time() - last_update
1897 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1898 msg = '%d files remaining...' % len(remaining)
1899 print msg
1900 logging.info(msg)
1901 last_update = time.time()
1902
1903 # Cache could evict some items we just tried to fetch, it's a fatal error.
1904 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001905 raise isolated_format.MappingError(
1906 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001907 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001908
1909
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001910def directory_to_metadata(root, algo, blacklist):
1911 """Returns the FileItem list and .isolated metadata for a directory."""
1912 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001913 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001914 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001915 metadata = {
1916 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001917 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001918 for relpath in paths
1919 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001920 for v in metadata.itervalues():
1921 v.pop('t')
1922 items = [
1923 FileItem(
1924 path=os.path.join(root, relpath),
1925 digest=meta['h'],
1926 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001927 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001928 for relpath, meta in metadata.iteritems() if 'h' in meta
1929 ]
1930 return items, metadata
1931
1932
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001933def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001934 """Stores every entries and returns the relevant data.
1935
1936 Arguments:
1937 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001938 files: list of file paths to upload. If a directory is specified, a
1939 .isolated file is created and its hash is returned.
1940 blacklist: function that returns True if a file should be omitted.
1941 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001942 assert all(isinstance(i, unicode) for i in files), files
1943 if len(files) != len(set(map(os.path.abspath, files))):
1944 raise Error('Duplicate entries found.')
1945
1946 results = []
1947 # The temporary directory is only created as needed.
1948 tempdir = None
1949 try:
1950 # TODO(maruel): Yield the files to a worker thread.
1951 items_to_upload = []
1952 for f in files:
1953 try:
1954 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001955 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001956 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001957 items, metadata = directory_to_metadata(
1958 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001959
1960 # Create the .isolated file.
1961 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001962 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1963 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001964 os.close(handle)
1965 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001966 'algo':
1967 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001968 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001969 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001970 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001971 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001972 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001973 items_to_upload.extend(items)
1974 items_to_upload.append(
1975 FileItem(
1976 path=isolated,
1977 digest=h,
maruel12e30012015-10-09 11:55:35 -07001978 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001979 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001980 results.append((h, f))
1981
maruel12e30012015-10-09 11:55:35 -07001982 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001983 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001984 items_to_upload.append(
1985 FileItem(
1986 path=filepath,
1987 digest=h,
maruel12e30012015-10-09 11:55:35 -07001988 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001989 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001990 results.append((h, f))
1991 else:
1992 raise Error('%s is neither a file or directory.' % f)
1993 except OSError:
1994 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001995 # Technically we would care about which files were uploaded but we don't
1996 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001997 _uploaded_files = storage.upload_items(items_to_upload)
1998 return results
1999 finally:
maruel12e30012015-10-09 11:55:35 -07002000 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002001 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002002
2003
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002004def archive(out, namespace, files, blacklist):
2005 if files == ['-']:
2006 files = sys.stdin.readlines()
2007
2008 if not files:
2009 raise Error('Nothing to upload')
2010
2011 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002012 blacklist = tools.gen_blacklist(blacklist)
2013 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002014 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002015 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2016
2017
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002018@subcommand.usage('<file1..fileN> or - to read from stdin')
2019def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002020 """Archives data to the server.
2021
2022 If a directory is specified, a .isolated file is created the whole directory
2023 is uploaded. Then this .isolated file can be included in another one to run
2024 commands.
2025
2026 The commands output each file that was processed with its content hash. For
2027 directories, the .isolated generated for the directory is listed as the
2028 directory entry itself.
2029 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002030 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002031 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002032 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002033 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002034 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002035 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002036 except Error as e:
2037 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002038 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002039
2040
2041def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002042 """Download data from the server.
2043
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002044 It can either download individual files or a complete tree from a .isolated
2045 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002046 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002047 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002048 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002049 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002050 help='hash of an isolated file, .isolated file content is discarded, use '
2051 '--file if you need it')
2052 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002053 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2054 help='hash and destination of a file, can be used multiple times')
2055 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002056 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002057 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002058 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002059 options, args = parser.parse_args(args)
2060 if args:
2061 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002062
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002063 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002064 if bool(options.isolated) == bool(options.file):
2065 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002066
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002067 cache = process_cache_options(options)
maruel12e30012015-10-09 11:55:35 -07002068 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002069 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002070 if (fs.isfile(options.target) or
2071 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002072 parser.error(
2073 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002074 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002075 # Fetching individual files.
2076 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002077 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002078 channel = threading_utils.TaskChannel()
2079 pending = {}
2080 for digest, dest in options.file:
2081 pending[digest] = dest
2082 storage.async_fetch(
2083 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002084 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002085 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002086 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002087 functools.partial(file_write, os.path.join(options.target, dest)))
2088 while pending:
2089 fetched = channel.pull()
2090 dest = pending.pop(fetched)
2091 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002092
Vadim Shtayura3172be52013-12-03 12:49:05 -08002093 # Fetching whole isolated tree.
2094 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002095 with cache:
2096 bundle = fetch_isolated(
2097 isolated_hash=options.isolated,
2098 storage=storage,
2099 cache=cache,
2100 outdir=options.target,
2101 require_command=False)
2102 if bundle.command:
2103 rel = os.path.join(options.target, bundle.relative_cwd)
2104 print('To run this test please run from the directory %s:' %
2105 os.path.join(options.target, rel))
2106 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002107
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002108 return 0
2109
2110
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002111def add_archive_options(parser):
2112 parser.add_option(
2113 '--blacklist',
2114 action='append', default=list(DEFAULT_BLACKLIST),
2115 help='List of regexp to use as blacklist filter when uploading '
2116 'directories')
2117
2118
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002119def add_isolate_server_options(parser):
2120 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002121 parser.add_option(
2122 '-I', '--isolate-server',
2123 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002124 help='URL of the Isolate Server to use. Defaults to the environment '
2125 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2126 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002127 parser.add_option(
2128 '--namespace', default='default-gzip',
2129 help='The namespace to use on the Isolate Server, default: %default')
2130
2131
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002132def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002133 """Processes the --isolate-server option and aborts if not specified.
2134
2135 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002136 """
2137 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002138 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002139 try:
2140 options.isolate_server = net.fix_url(options.isolate_server)
2141 except ValueError as e:
2142 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002143 if set_exception_handler:
2144 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002145 try:
2146 return auth.ensure_logged_in(options.isolate_server)
2147 except ValueError as e:
2148 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002149
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002150
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002151def add_cache_options(parser):
2152 cache_group = optparse.OptionGroup(parser, 'Cache management')
2153 cache_group.add_option(
2154 '--cache', metavar='DIR',
2155 help='Directory to keep a local cache of the files. Accelerates download '
2156 'by reusing already downloaded files. Default=%default')
2157 cache_group.add_option(
2158 '--max-cache-size',
2159 type='int',
2160 metavar='NNN',
2161 default=20*1024*1024*1024,
2162 help='Trim if the cache gets larger than this value, default=%default')
2163 cache_group.add_option(
2164 '--min-free-space',
2165 type='int',
2166 metavar='NNN',
2167 default=2*1024*1024*1024,
2168 help='Trim if disk free space becomes lower than this value, '
2169 'default=%default')
2170 cache_group.add_option(
2171 '--max-items',
2172 type='int',
2173 metavar='NNN',
2174 default=100000,
2175 help='Trim if more than this number of items are in the cache '
2176 'default=%default')
2177 parser.add_option_group(cache_group)
2178
2179
2180def process_cache_options(options):
2181 if options.cache:
2182 policies = CachePolicies(
2183 options.max_cache_size, options.min_free_space, options.max_items)
2184
2185 # |options.cache| path may not exist until DiskCache() instance is created.
2186 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002187 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002188 policies,
2189 isolated_format.get_hash_algo(options.namespace))
2190 else:
2191 return MemoryCache()
2192
2193
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002194class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002195 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002196 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002197 self,
2198 version=__version__,
2199 prog=os.path.basename(sys.modules[__name__].__file__),
2200 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002201 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002202
2203 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002204 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002205 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002206 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002207 return options, args
2208
2209
2210def main(args):
2211 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002212 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002213
2214
2215if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002216 fix_encoding.fix_encoding()
2217 tools.disable_buffering()
2218 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002219 sys.exit(main(sys.argv[1:]))