blob: 3429f64c7c80db9c9a03f822c11642099da461d0 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
maruel12e30012015-10-09 11:55:35 -07008__version__ = '0.4.5'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050018import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050021import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080041
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043# Version of isolate protocol passed to the server in /handshake request.
44ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046
Vadim Shtayura3148e072014-09-02 18:51:52 -070047# The file size to be used when we don't know the correct file size,
48# generally used for .isolated files.
49UNKNOWN_FILE_SIZE = None
50
51
52# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000080# Chunk size to use when reading from network stream.
81NET_IO_FILE_CHUNK = 16 * 1024
82
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000083
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000084# Read timeout in seconds for downloads from isolate storage. If there's no
85# response from the server within this timeout whole download will be aborted.
86DOWNLOAD_READ_TIMEOUT = 60
87
88
maruel@chromium.org41601642013-09-18 19:40:46 +000089# The delay (in seconds) to wait between logging statements when retrieving
90# the required files. This is intended to let the user (or buildbot) know that
91# the program is still running.
92DELAY_BETWEEN_UPDATES_IN_SECS = 30
93
94
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050095DEFAULT_BLACKLIST = (
96 # Temporary vim or python files.
97 r'^.+\.(?:pyc|swp)$',
98 # .git or .svn directory.
99 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
100)
101
102
Vadim Shtayura8623c272014-12-01 11:45:27 -0800103# A class to use to communicate with the server by default. Can be changed by
104# 'set_storage_api_class'. Default is IsolateServer.
105_storage_api_cls = None
106
107
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500108class Error(Exception):
109 """Generic runtime error."""
110 pass
111
112
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400113class Aborted(Error):
114 """Operation aborted."""
115 pass
116
117
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000118def stream_read(stream, chunk_size):
119 """Reads chunks from |stream| and yields them."""
120 while True:
121 data = stream.read(chunk_size)
122 if not data:
123 break
124 yield data
125
126
maruel12e30012015-10-09 11:55:35 -0700127def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800128 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700129 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800130 if offset:
131 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000133 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000134 if not data:
135 break
136 yield data
137
138
maruel12e30012015-10-09 11:55:35 -0700139def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000140 """Writes file content as generated by content_generator.
141
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000142 Creates the intermediary directory as needed.
143
144 Returns the number of bytes written.
145
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000146 Meant to be mocked out in unit tests.
147 """
maruel12e30012015-10-09 11:55:35 -0700148 filedir = os.path.dirname(path)
149 if not fs.isdir(filedir):
150 fs.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 total = 0
maruel12e30012015-10-09 11:55:35 -0700152 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000155 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000156 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000157
158
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000159def zip_compress(content_generator, level=7):
160 """Reads chunks from |content_generator| and yields zip compressed chunks."""
161 compressor = zlib.compressobj(level)
162 for chunk in content_generator:
163 compressed = compressor.compress(chunk)
164 if compressed:
165 yield compressed
166 tail = compressor.flush(zlib.Z_FINISH)
167 if tail:
168 yield tail
169
170
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400171def zip_decompress(
172 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000173 """Reads zipped data from |content_generator| and yields decompressed data.
174
175 Decompresses data in small chunks (no larger than |chunk_size|) so that
176 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
177
178 Raises IOError if data is corrupted or incomplete.
179 """
180 decompressor = zlib.decompressobj()
181 compressed_size = 0
182 try:
183 for chunk in content_generator:
184 compressed_size += len(chunk)
185 data = decompressor.decompress(chunk, chunk_size)
186 if data:
187 yield data
188 while decompressor.unconsumed_tail:
189 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
190 if data:
191 yield data
192 tail = decompressor.flush()
193 if tail:
194 yield tail
195 except zlib.error as e:
196 raise IOError(
197 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
198 # Ensure all data was read and decompressed.
199 if decompressor.unused_data or decompressor.unconsumed_tail:
200 raise IOError('Not all data was decompressed')
201
202
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000203def get_zip_compression_level(filename):
204 """Given a filename calculates the ideal zip compression level to use."""
205 file_ext = os.path.splitext(filename)[1].lower()
206 # TODO(csharp): Profile to find what compression level works best.
207 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
208
209
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000210def create_directories(base_directory, files):
211 """Creates the directory structure needed by the given list of files."""
212 logging.debug('create_directories(%s, %d)', base_directory, len(files))
213 # Creates the tree of directories to create.
214 directories = set(os.path.dirname(f) for f in files)
215 for item in list(directories):
216 while item:
217 directories.add(item)
218 item = os.path.dirname(item)
219 for d in sorted(directories):
220 if d:
maruel12e30012015-10-09 11:55:35 -0700221 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000222
223
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500224def create_symlinks(base_directory, files):
225 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000226 for filepath, properties in files:
227 if 'l' not in properties:
228 continue
229 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500230 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000231 logging.warning('Ignoring symlink %s', filepath)
232 continue
233 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500234 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000235 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000236
237
maruel12e30012015-10-09 11:55:35 -0700238def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000239 """Determines if the given files appears valid.
240
241 Currently it just checks the file's size.
242 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700243 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700244 return fs.isfile(path)
245 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000246 if size != actual_size:
247 logging.warning(
248 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700249 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000250 return False
251 return True
252
253
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000254class Item(object):
255 """An item to push to Storage.
256
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800257 Its digest and size may be provided in advance, if known. Otherwise they will
258 be derived from content(). If digest is provided, it MUST correspond to
259 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000260
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800261 When used with Storage, Item starts its life in a main thread, travels
262 to 'contains' thread, then to 'push' thread and then finally back to
263 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000264 """
265
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800266 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267 self.digest = digest
268 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800269 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000270 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000271
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800272 def content(self):
273 """Iterable with content of this item as byte string (str) chunks."""
274 raise NotImplementedError()
275
276 def prepare(self, hash_algo):
277 """Ensures self.digest and self.size are set.
278
279 Uses content() as a source of data to calculate them. Does nothing if digest
280 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000281
282 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800283 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000284 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800285 if self.digest is None or self.size is None:
286 digest = hash_algo()
287 total = 0
288 for chunk in self.content():
289 digest.update(chunk)
290 total += len(chunk)
291 self.digest = digest.hexdigest()
292 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000293
294
295class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800296 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000297
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800298 Its digest and size may be provided in advance, if known. Otherwise they will
299 be derived from the file content.
300 """
301
302 def __init__(self, path, digest=None, size=None, high_priority=False):
303 super(FileItem, self).__init__(
304 digest,
maruel12e30012015-10-09 11:55:35 -0700305 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800306 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000307 self.path = path
308 self.compression_level = get_zip_compression_level(path)
309
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800310 def content(self):
311 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000312
313
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000314class BufferItem(Item):
315 """A byte buffer to push to Storage."""
316
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 def __init__(self, buf, high_priority=False):
318 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000319 self.buffer = buf
320
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800321 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000322 return [self.buffer]
323
324
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000325class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800326 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000327
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800328 Implements compression support, parallel 'contains' checks, parallel uploads
329 and more.
330
331 Works only within single namespace (and thus hashing algorithm and compression
332 scheme are fixed).
333
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400334 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
335 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800336 """
337
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700338 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000339 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400340 self._use_zip = isolated_format.is_namespace_with_compression(
341 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400342 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000343 self._cpu_thread_pool = None
344 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400345 self._aborted = False
346 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000347
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000348 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700349 def hash_algo(self):
350 """Hashing algorithm used to name files in storage based on their content.
351
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400352 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700353 """
354 return self._hash_algo
355
356 @property
357 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500358 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700359 return self._storage_api.location
360
361 @property
362 def namespace(self):
363 """Isolate namespace used by this storage.
364
365 Indirectly defines hashing scheme and compression method used.
366 """
367 return self._storage_api.namespace
368
369 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000370 def cpu_thread_pool(self):
371 """ThreadPool for CPU-bound tasks like zipping."""
372 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500373 threads = max(threading_utils.num_processors(), 2)
374 if sys.maxsize <= 2L**32:
375 # On 32 bits userland, do not try to use more than 16 threads.
376 threads = min(threads, 16)
377 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000378 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000379
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000380 @property
381 def net_thread_pool(self):
382 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
383 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700384 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000385 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000386
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 def close(self):
388 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400389 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000390 if self._cpu_thread_pool:
391 self._cpu_thread_pool.join()
392 self._cpu_thread_pool.close()
393 self._cpu_thread_pool = None
394 if self._net_thread_pool:
395 self._net_thread_pool.join()
396 self._net_thread_pool.close()
397 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400398 logging.info('Done.')
399
400 def abort(self):
401 """Cancels any pending or future operations."""
402 # This is not strictly theadsafe, but in the worst case the logging message
403 # will be printed twice. Not a big deal. In other places it is assumed that
404 # unprotected reads and writes to _aborted are serializable (it is true
405 # for python) and thus no locking is used.
406 if not self._aborted:
407 logging.warning('Aborting... It can take a while.')
408 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000409
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000410 def __enter__(self):
411 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400412 assert not self._prev_sig_handlers, self._prev_sig_handlers
413 for s in (signal.SIGINT, signal.SIGTERM):
414 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 return self
416
417 def __exit__(self, _exc_type, _exc_value, _traceback):
418 """Context manager interface."""
419 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400420 while self._prev_sig_handlers:
421 s, h = self._prev_sig_handlers.popitem()
422 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000423 return False
424
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000425 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800426 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000427
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800428 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000429
430 Arguments:
431 items: list of Item instances that represents data to upload.
432
433 Returns:
434 List of items that were uploaded. All other items are already there.
435 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700436 logging.info('upload_items(items=%d)', len(items))
437
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800438 # Ensure all digests are calculated.
439 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700440 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800441
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000442 # For each digest keep only first Item that matches it. All other items
443 # are just indistinguishable copies from the point of view of isolate
444 # server (it doesn't care about paths at all, only content and digests).
445 seen = {}
446 duplicates = 0
447 for item in items:
448 if seen.setdefault(item.digest, item) is not item:
449 duplicates += 1
450 items = seen.values()
451 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700452 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000453
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000454 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000455 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000456 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800457 channel = threading_utils.TaskChannel()
458 for missing_item, push_state in self.get_missing_items(items):
459 missing.add(missing_item)
460 self.async_push(channel, missing_item, push_state)
461
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 # No need to spawn deadlock detector thread if there's nothing to upload.
463 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700464 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000466 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000467 detector.ping()
468 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000469 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000470 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000471 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000472 logging.info('All files are uploaded')
473
474 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000475 total = len(items)
476 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000477 logging.info(
478 'Total: %6d, %9.1fkb',
479 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000480 total_size / 1024.)
481 cache_hit = set(items) - missing
482 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000483 logging.info(
484 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
485 len(cache_hit),
486 cache_hit_size / 1024.,
487 len(cache_hit) * 100. / total,
488 cache_hit_size * 100. / total_size if total_size else 0)
489 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000490 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000491 logging.info(
492 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
493 len(cache_miss),
494 cache_miss_size / 1024.,
495 len(cache_miss) * 100. / total,
496 cache_miss_size * 100. / total_size if total_size else 0)
497
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000498 return uploaded
499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000501 """Starts asynchronous push to the server in a parallel thread.
502
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800503 Can be used only after |item| was checked for presence on a server with
504 'get_missing_items' call. 'get_missing_items' returns |push_state| object
505 that contains storage specific information describing how to upload
506 the item (for example in case of cloud storage, it is signed upload URLs).
507
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000508 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000509 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000510 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511 push_state: push state returned by 'get_missing_items' call for |item|.
512
513 Returns:
514 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000515 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800516 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400517 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700518 threading_utils.PRIORITY_HIGH if item.high_priority
519 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800520
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000521 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400522 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400523 if self._aborted:
524 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700525 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800526 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000527 return item
528
529 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700530 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800531 self.net_thread_pool.add_task_with_channel(
532 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000533 return
534
535 # If zipping is enabled, zip in a separate thread.
536 def zip_and_push():
537 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
538 # content right here. It will block until all file is zipped.
539 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400540 if self._aborted:
541 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800542 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000543 data = ''.join(stream)
544 except Exception as exc:
545 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800546 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000547 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000548 self.net_thread_pool.add_task_with_channel(
549 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000550 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000551
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800552 def push(self, item, push_state):
553 """Synchronously pushes a single item to the server.
554
555 If you need to push many items at once, consider using 'upload_items' or
556 'async_push' with instance of TaskChannel.
557
558 Arguments:
559 item: item to upload as instance of Item class.
560 push_state: push state returned by 'get_missing_items' call for |item|.
561
562 Returns:
563 Pushed item (same object as |item|).
564 """
565 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700566 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800567 self.async_push(channel, item, push_state)
568 pushed = channel.pull()
569 assert pushed is item
570 return item
571
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000572 def async_fetch(self, channel, priority, digest, size, sink):
573 """Starts asynchronous fetch from the server in a parallel thread.
574
575 Arguments:
576 channel: TaskChannel that receives back |digest| when download ends.
577 priority: thread pool task priority for the fetch.
578 digest: hex digest of an item to download.
579 size: expected size of the item (after decompression).
580 sink: function that will be called as sink(generator).
581 """
582 def fetch():
583 try:
584 # Prepare reading pipeline.
585 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700586 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400587 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000588 # Run |stream| through verifier that will assert its size.
589 verifier = FetchStreamVerifier(stream, size)
590 # Verified stream goes to |sink|.
591 sink(verifier.run())
592 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800593 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000594 raise
595 return digest
596
597 # Don't bother with zip_thread_pool for decompression. Decompression is
598 # really fast and most probably IO bound anyway.
599 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
600
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000601 def get_missing_items(self, items):
602 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000603
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000604 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000605
606 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000607 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000608
609 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800610 For each missing item it yields a pair (item, push_state), where:
611 * item - Item object that is missing (one of |items|).
612 * push_state - opaque object that contains storage specific information
613 describing how to upload the item (for example in case of cloud
614 storage, it is signed upload URLs). It can later be passed to
615 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000617 channel = threading_utils.TaskChannel()
618 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800619
620 # Ensure all digests are calculated.
621 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700622 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800623
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400624 def contains(batch):
625 if self._aborted:
626 raise Aborted()
627 return self._storage_api.contains(batch)
628
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000629 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800630 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400631 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400632 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000633 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800634
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000635 # Yield results as they come in.
636 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800637 for missing_item, push_state in channel.pull().iteritems():
638 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000639
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000640
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800641def batch_items_for_check(items):
642 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000643
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800644 Each batch corresponds to a single 'exists?' query to the server via a call
645 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000646
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800647 Arguments:
648 items: a list of Item objects.
649
650 Yields:
651 Batches of items to query for existence in a single operation,
652 each batch is a list of Item objects.
653 """
654 batch_count = 0
655 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
656 next_queries = []
657 for item in sorted(items, key=lambda x: x.size, reverse=True):
658 next_queries.append(item)
659 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000660 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800661 next_queries = []
662 batch_count += 1
663 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
664 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
665 if next_queries:
666 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000667
668
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000669class FetchQueue(object):
670 """Fetches items from Storage and places them into LocalCache.
671
672 It manages multiple concurrent fetch operations. Acts as a bridge between
673 Storage and LocalCache so that Storage and LocalCache don't depend on each
674 other at all.
675 """
676
677 def __init__(self, storage, cache):
678 self.storage = storage
679 self.cache = cache
680 self._channel = threading_utils.TaskChannel()
681 self._pending = set()
682 self._accessed = set()
683 self._fetched = cache.cached_set()
684
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400685 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700686 self,
687 digest,
688 size=UNKNOWN_FILE_SIZE,
689 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000690 """Starts asynchronous fetch of item |digest|."""
691 # Fetching it now?
692 if digest in self._pending:
693 return
694
695 # Mark this file as in use, verify_all_cached will later ensure it is still
696 # in cache.
697 self._accessed.add(digest)
698
699 # Already fetched? Notify cache to update item's LRU position.
700 if digest in self._fetched:
701 # 'touch' returns True if item is in cache and not corrupted.
702 if self.cache.touch(digest, size):
703 return
704 # Item is corrupted, remove it from cache and fetch it again.
705 self._fetched.remove(digest)
706 self.cache.evict(digest)
707
708 # TODO(maruel): It should look at the free disk space, the current cache
709 # size and the size of the new item on every new item:
710 # - Trim the cache as more entries are listed when free disk space is low,
711 # otherwise if the amount of data downloaded during the run > free disk
712 # space, it'll crash.
713 # - Make sure there's enough free disk space to fit all dependencies of
714 # this run! If not, abort early.
715
716 # Start fetching.
717 self._pending.add(digest)
718 self.storage.async_fetch(
719 self._channel, priority, digest, size,
720 functools.partial(self.cache.write, digest))
721
722 def wait(self, digests):
723 """Starts a loop that waits for at least one of |digests| to be retrieved.
724
725 Returns the first digest retrieved.
726 """
727 # Flush any already fetched items.
728 for digest in digests:
729 if digest in self._fetched:
730 return digest
731
732 # Ensure all requested items are being fetched now.
733 assert all(digest in self._pending for digest in digests), (
734 digests, self._pending)
735
736 # Wait for some requested item to finish fetching.
737 while self._pending:
738 digest = self._channel.pull()
739 self._pending.remove(digest)
740 self._fetched.add(digest)
741 if digest in digests:
742 return digest
743
744 # Should never reach this point due to assert above.
745 raise RuntimeError('Impossible state')
746
747 def inject_local_file(self, path, algo):
748 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700749 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000750 data = f.read()
751 digest = algo(data).hexdigest()
752 self.cache.write(digest, [data])
753 self._fetched.add(digest)
754 return digest
755
756 @property
757 def pending_count(self):
758 """Returns number of items to be fetched."""
759 return len(self._pending)
760
761 def verify_all_cached(self):
762 """True if all accessed items are in cache."""
763 return self._accessed.issubset(self.cache.cached_set())
764
765
766class FetchStreamVerifier(object):
767 """Verifies that fetched file is valid before passing it to the LocalCache."""
768
769 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400770 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000771 self.stream = stream
772 self.expected_size = expected_size
773 self.current_size = 0
774
775 def run(self):
776 """Generator that yields same items as |stream|.
777
778 Verifies |stream| is complete before yielding a last chunk to consumer.
779
780 Also wraps IOError produced by consumer into MappingError exceptions since
781 otherwise Storage will retry fetch on unrelated local cache errors.
782 """
783 # Read one chunk ahead, keep it in |stored|.
784 # That way a complete stream can be verified before pushing last chunk
785 # to consumer.
786 stored = None
787 for chunk in self.stream:
788 assert chunk is not None
789 if stored is not None:
790 self._inspect_chunk(stored, is_last=False)
791 try:
792 yield stored
793 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400794 raise isolated_format.MappingError(
795 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000796 stored = chunk
797 if stored is not None:
798 self._inspect_chunk(stored, is_last=True)
799 try:
800 yield stored
801 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400802 raise isolated_format.MappingError(
803 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000804
805 def _inspect_chunk(self, chunk, is_last):
806 """Called for each fetched chunk before passing it to consumer."""
807 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400808 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700809 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000810 (self.expected_size != self.current_size)):
811 raise IOError('Incorrect file size: expected %d, got %d' % (
812 self.expected_size, self.current_size))
813
814
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000815class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800816 """Interface for classes that implement low-level storage operations.
817
818 StorageApi is oblivious of compression and hashing scheme used. This details
819 are handled in higher level Storage class.
820
821 Clients should generally not use StorageApi directly. Storage class is
822 preferred since it implements compression and upload optimizations.
823 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000824
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700825 @property
826 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500827 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700828 raise NotImplementedError()
829
830 @property
831 def namespace(self):
832 """Isolate namespace used by this storage.
833
834 Indirectly defines hashing scheme and compression method used.
835 """
836 raise NotImplementedError()
837
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800838 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000839 """Fetches an object and yields its content.
840
841 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000842 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800843 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000844
845 Yields:
846 Chunks of downloaded item (as str objects).
847 """
848 raise NotImplementedError()
849
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800850 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000851 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000852
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800853 |item| MUST go through 'contains' call to get |push_state| before it can
854 be pushed to the storage.
855
856 To be clear, here is one possible usage:
857 all_items = [... all items to push as Item subclasses ...]
858 for missing_item, push_state in storage_api.contains(all_items).items():
859 storage_api.push(missing_item, push_state)
860
861 When pushing to a namespace with compression, data that should be pushed
862 and data provided by the item is not the same. In that case |content| is
863 not None and it yields chunks of compressed data (using item.content() as
864 a source of original uncompressed data). This is implemented by Storage
865 class.
866
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000867 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000868 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800869 push_state: push state object as returned by 'contains' call.
870 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000871
872 Returns:
873 None.
874 """
875 raise NotImplementedError()
876
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000877 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800878 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000879
880 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800881 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000882
883 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800884 A dict missing Item -> opaque push state object to be passed to 'push'.
885 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000886 """
887 raise NotImplementedError()
888
889
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800890class _IsolateServerPushState(object):
891 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500892
893 Note this needs to be a global class to support pickling.
894 """
895
Cory Massarocc19c8c2015-03-10 13:35:11 -0700896 def __init__(self, preupload_status, size):
897 self.preupload_status = preupload_status
898 gs_upload_url = preupload_status.get('gs_upload_url') or None
899 if gs_upload_url:
900 self.upload_url = gs_upload_url
901 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
902 else:
903 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
904 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500905 self.uploaded = False
906 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500907 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500908
909
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000910class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000911 """StorageApi implementation that downloads and uploads to Isolate Server.
912
913 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800914 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000915 """
916
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000917 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000918 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500919 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700920 self._base_url = base_url.rstrip('/')
921 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700922 self._namespace_dict = {
923 'compression': 'flate' if namespace.endswith(
924 ('-gzip', '-flate')) else '',
925 'digest_hash': 'sha-1',
926 'namespace': namespace,
927 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000928 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000929 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500930 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000931
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000932 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000933 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700934 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000935
936 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700937 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000938 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000939 # TODO(maruel): Make this request much earlier asynchronously while the
940 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800941
942 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
943 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700944
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000945 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000946 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700947 self._server_caps = net.url_read_json(
948 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
949 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000950 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000951
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700952 @property
953 def location(self):
954 return self._base_url
955
956 @property
957 def namespace(self):
958 return self._namespace
959
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800960 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700961 assert offset >= 0
962 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
963 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800964 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700965 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000966
Cory Massarocc19c8c2015-03-10 13:35:11 -0700967 if not response:
maruele154f9c2015-09-14 11:03:15 -0700968 raise IOError(
969 'Attempted to fetch from %s; no data exist: %s / %s.' % (
970 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800971
Cory Massarocc19c8c2015-03-10 13:35:11 -0700972 # for DB uploads
973 content = response.get('content')
974 if content is not None:
975 return base64.b64decode(content)
976
977 # for GS entities
978 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -0700979 if not connection:
980 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -0700981
982 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800983 if offset:
984 content_range = connection.get_header('Content-Range')
985 if not content_range:
986 raise IOError('Missing Content-Range header')
987
988 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
989 # According to a spec, <size> can be '*' meaning "Total size of the file
990 # is not known in advance".
991 try:
992 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
993 if not match:
994 raise ValueError()
995 content_offset = int(match.group(1))
996 last_byte_index = int(match.group(2))
997 size = None if match.group(3) == '*' else int(match.group(3))
998 except ValueError:
999 raise IOError('Invalid Content-Range header: %s' % content_range)
1000
1001 # Ensure returned offset equals requested one.
1002 if offset != content_offset:
1003 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1004 offset, content_offset, content_range))
1005
1006 # Ensure entire tail of the file is returned.
1007 if size is not None and last_byte_index + 1 != size:
1008 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001010 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001011
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001012 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001013 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001014 assert item.digest is not None
1015 assert item.size is not None
1016 assert isinstance(push_state, _IsolateServerPushState)
1017 assert not push_state.finalized
1018
1019 # Default to item.content().
1020 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001021 logging.info('Push state size: %d', push_state.size)
1022 if isinstance(content, (basestring, list)):
1023 # Memory is already used, too late.
1024 with self._lock:
1025 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001026 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001027 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1028 # If |content| is indeed a generator, it can not be re-winded back to the
1029 # beginning of the stream. A retry will find it exhausted. A possible
1030 # solution is to wrap |content| generator with some sort of caching
1031 # restartable generator. It should be done alongside streaming support
1032 # implementation.
1033 #
1034 # In theory, we should keep the generator, so that it is not serialized in
1035 # memory. Sadly net.HttpService.request() requires the body to be
1036 # serialized.
1037 assert isinstance(content, types.GeneratorType), repr(content)
1038 slept = False
1039 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001040 # One byte less than 512mb. This is to cope with incompressible content.
1041 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001042 while True:
1043 with self._lock:
1044 # This is due to 32 bits python when uploading very large files. The
1045 # problem is that it's comparing uncompressed sizes, while we care
1046 # about compressed sizes since it's what is serialized in memory.
1047 # The first check assumes large files are compressible and that by
1048 # throttling one upload at once, we can survive. Otherwise, kaboom.
1049 memory_use = self._memory_use
1050 if ((push_state.size >= max_size and not memory_use) or
1051 (memory_use + push_state.size <= max_size)):
1052 self._memory_use += push_state.size
1053 memory_use = self._memory_use
1054 break
1055 time.sleep(0.1)
1056 slept = True
1057 if slept:
1058 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001059
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001060 try:
1061 # This push operation may be a retry after failed finalization call below,
1062 # no need to reupload contents in that case.
1063 if not push_state.uploaded:
1064 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001065 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001066 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001067 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001068 item.digest, push_state.upload_url))
1069 push_state.uploaded = True
1070 else:
1071 logging.info(
1072 'A file %s already uploaded, retrying finalization only',
1073 item.digest)
1074
1075 # Optionally notify the server that it's done.
1076 if push_state.finalize_url:
1077 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1078 # send it to isolated server. That way isolate server can verify that
1079 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1080 # stored files).
1081 # TODO(maruel): Fix the server to accept properly data={} so
1082 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001083 response = net.url_read_json(
1084 url='%s/%s' % (self._base_url, push_state.finalize_url),
1085 data={
1086 'upload_ticket': push_state.preupload_status['upload_ticket'],
1087 })
1088 if not response or not response['ok']:
1089 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001090 push_state.finalized = True
1091 finally:
1092 with self._lock:
1093 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001094
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001095 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001096 # Ensure all items were initialized with 'prepare' call. Storage does that.
1097 assert all(i.digest is not None and i.size is not None for i in items)
1098
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001099 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001100 body = {
1101 'items': [
1102 {
1103 'digest': item.digest,
1104 'is_isolated': bool(item.high_priority),
1105 'size': item.size,
1106 } for item in items
1107 ],
1108 'namespace': self._namespace_dict,
1109 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001110
Cory Massarocc19c8c2015-03-10 13:35:11 -07001111 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001112
1113 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001114 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001115 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001116 response = net.url_read_json(url=query_url, data=body)
1117 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001118 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001119 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001120 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001121 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001122 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001123
1124 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001125 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001126 for preupload_status in response.get('items', []):
1127 assert 'upload_ticket' in preupload_status, (
1128 preupload_status, '/preupload did not generate an upload ticket')
1129 index = int(preupload_status['index'])
1130 missing_items[items[index]] = _IsolateServerPushState(
1131 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001132 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001133 len(items), len(items) - len(missing_items))
1134 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001135
Cory Massarocc19c8c2015-03-10 13:35:11 -07001136 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001137 """Fetches isolated data from the URL.
1138
1139 Used only for fetching files, not for API calls. Can be overridden in
1140 subclasses.
1141
1142 Args:
1143 url: URL to fetch the data from, can possibly return http redirect.
1144 offset: byte offset inside the file to start fetching from.
1145
1146 Returns:
1147 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1148 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001149 assert isinstance(offset, int)
1150 data = {
1151 'digest': digest.encode('utf-8'),
1152 'namespace': self._namespace_dict,
1153 'offset': offset,
1154 }
maruel0c25f4f2015-12-15 05:41:17 -08001155 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1156 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001157 return net.url_read_json(
1158 url=url,
1159 data=data,
1160 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001161
Cory Massarocc19c8c2015-03-10 13:35:11 -07001162 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001163 """Uploads isolated file to the URL.
1164
1165 Used only for storing files, not for API calls. Can be overridden in
1166 subclasses.
1167
1168 Args:
1169 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001170 push_state: an _IsolateServicePushState instance
1171 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001172 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001173 """
1174 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1175 # upload support is implemented.
1176 if isinstance(content, list) and len(content) == 1:
1177 content = content[0]
1178 else:
1179 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001180
1181 # DB upload
1182 if not push_state.finalize_url:
1183 url = '%s/%s' % (self._base_url, push_state.upload_url)
1184 content = base64.b64encode(content)
1185 data = {
1186 'upload_ticket': push_state.preupload_status['upload_ticket'],
1187 'content': content,
1188 }
1189 response = net.url_read_json(url=url, data=data)
1190 return response is not None and response['ok']
1191
1192 # upload to GS
1193 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001194 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001195 content_type='application/octet-stream',
1196 data=content,
1197 method='PUT',
1198 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001199 return response is not None
1200
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001201
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001202class LocalCache(object):
1203 """Local cache that stores objects fetched via Storage.
1204
1205 It can be accessed concurrently from multiple threads, so it should protect
1206 its internal state with some lock.
1207 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001208 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001209
1210 def __enter__(self):
1211 """Context manager interface."""
1212 return self
1213
1214 def __exit__(self, _exc_type, _exec_value, _traceback):
1215 """Context manager interface."""
1216 return False
1217
1218 def cached_set(self):
1219 """Returns a set of all cached digests (always a new object)."""
1220 raise NotImplementedError()
1221
1222 def touch(self, digest, size):
1223 """Ensures item is not corrupted and updates its LRU position.
1224
1225 Arguments:
1226 digest: hash digest of item to check.
1227 size: expected size of this item.
1228
1229 Returns:
1230 True if item is in cache and not corrupted.
1231 """
1232 raise NotImplementedError()
1233
1234 def evict(self, digest):
1235 """Removes item from cache if it's there."""
1236 raise NotImplementedError()
1237
1238 def read(self, digest):
1239 """Returns contents of the cached item as a single str."""
1240 raise NotImplementedError()
1241
1242 def write(self, digest, content):
1243 """Reads data from |content| generator and stores it in cache."""
1244 raise NotImplementedError()
1245
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001246 def hardlink(self, digest, dest, file_mode):
1247 """Ensures file at |dest| has same content as cached |digest|.
1248
1249 If file_mode is provided, it is used to set the executable bit if
1250 applicable.
1251 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001252 raise NotImplementedError()
1253
1254
1255class MemoryCache(LocalCache):
1256 """LocalCache implementation that stores everything in memory."""
1257
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001258 def __init__(self, file_mode_mask=0500):
1259 """Args:
1260 file_mode_mask: bit mask to AND file mode with. Default value will make
1261 all mapped files to be read only.
1262 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001263 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001264 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001265 # Let's not assume dict is thread safe.
1266 self._lock = threading.Lock()
1267 self._contents = {}
1268
1269 def cached_set(self):
1270 with self._lock:
1271 return set(self._contents)
1272
1273 def touch(self, digest, size):
1274 with self._lock:
1275 return digest in self._contents
1276
1277 def evict(self, digest):
1278 with self._lock:
1279 self._contents.pop(digest, None)
1280
1281 def read(self, digest):
1282 with self._lock:
1283 return self._contents[digest]
1284
1285 def write(self, digest, content):
1286 # Assemble whole stream before taking the lock.
1287 data = ''.join(content)
1288 with self._lock:
1289 self._contents[digest] = data
1290
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001291 def hardlink(self, digest, dest, file_mode):
1292 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001293 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001294 if file_mode is not None:
maruel12e30012015-10-09 11:55:35 -07001295 fs.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001296
1297
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001298class CachePolicies(object):
1299 def __init__(self, max_cache_size, min_free_space, max_items):
1300 """
1301 Arguments:
1302 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1303 cache is effectively a leak.
1304 - min_free_space: Trim if disk free space becomes lower than this value. If
1305 0, it unconditionally fill the disk.
1306 - max_items: Maximum number of items to keep in the cache. If 0, do not
1307 enforce a limit.
1308 """
1309 self.max_cache_size = max_cache_size
1310 self.min_free_space = min_free_space
1311 self.max_items = max_items
1312
1313
1314class DiskCache(LocalCache):
1315 """Stateful LRU cache in a flat hash table in a directory.
1316
1317 Saves its state as json file.
1318 """
maruel12e30012015-10-09 11:55:35 -07001319 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001320
1321 def __init__(self, cache_dir, policies, hash_algo):
1322 """
1323 Arguments:
1324 cache_dir: directory where to place the cache.
1325 policies: cache retention policies.
1326 algo: hashing algorithm used.
1327 """
1328 super(DiskCache, self).__init__()
1329 self.cache_dir = cache_dir
1330 self.policies = policies
1331 self.hash_algo = hash_algo
1332 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1333
1334 # All protected methods (starting with '_') except _path should be called
1335 # with this lock locked.
1336 self._lock = threading_utils.LockWithAssert()
1337 self._lru = lru.LRUDict()
1338
1339 # Profiling values.
1340 self._added = []
1341 self._removed = []
1342 self._free_disk = 0
1343
1344 with tools.Profiler('Setup'):
1345 with self._lock:
1346 self._load()
1347
1348 def __enter__(self):
1349 return self
1350
1351 def __exit__(self, _exc_type, _exec_value, _traceback):
1352 with tools.Profiler('CleanupTrimming'):
1353 with self._lock:
1354 self._trim()
1355
1356 logging.info(
1357 '%5d (%8dkb) added',
1358 len(self._added), sum(self._added) / 1024)
1359 logging.info(
1360 '%5d (%8dkb) current',
1361 len(self._lru),
1362 sum(self._lru.itervalues()) / 1024)
1363 logging.info(
1364 '%5d (%8dkb) removed',
1365 len(self._removed), sum(self._removed) / 1024)
1366 logging.info(
1367 ' %8dkb free',
1368 self._free_disk / 1024)
1369 return False
1370
1371 def cached_set(self):
1372 with self._lock:
1373 return self._lru.keys_set()
1374
1375 def touch(self, digest, size):
1376 """Verifies an actual file is valid.
1377
1378 Note that is doesn't compute the hash so it could still be corrupted if the
1379 file size didn't change.
1380
1381 TODO(maruel): More stringent verification while keeping the check fast.
1382 """
1383 # Do the check outside the lock.
1384 if not is_valid_file(self._path(digest), size):
1385 return False
1386
1387 # Update it's LRU position.
1388 with self._lock:
1389 if digest not in self._lru:
1390 return False
1391 self._lru.touch(digest)
1392 return True
1393
1394 def evict(self, digest):
1395 with self._lock:
1396 self._lru.pop(digest)
1397 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1398
1399 def read(self, digest):
maruel12e30012015-10-09 11:55:35 -07001400 with fs.open(self._path(digest), 'rb') as f:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001401 return f.read()
1402
1403 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001404 assert content is not None
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001405 path = self._path(digest)
1406 # A stale broken file may remain. It is possible for the file to have write
1407 # access bit removed which would cause the file_write() call to fail to open
1408 # in write mode. Take no chance here.
1409 file_path.try_remove(path)
1410 try:
1411 size = file_write(path, content)
1412 except:
1413 # There are two possible places were an exception can occur:
1414 # 1) Inside |content| generator in case of network or unzipping errors.
1415 # 2) Inside file_write itself in case of disk IO errors.
1416 # In any case delete an incomplete file and propagate the exception to
1417 # caller, it will be logged there.
1418 file_path.try_remove(path)
1419 raise
1420 # Make the file read-only in the cache. This has a few side-effects since
1421 # the file node is modified, so every directory entries to this file becomes
1422 # read-only. It's fine here because it is a new file.
1423 file_path.set_read_only(path, True)
1424 with self._lock:
1425 self._add(digest, size)
1426
1427 def hardlink(self, digest, dest, file_mode):
1428 """Hardlinks the file to |dest|.
1429
1430 Note that the file permission bits are on the file node, not the directory
1431 entry, so changing the access bit on any of the directory entries for the
1432 file node will affect them all.
1433 """
1434 path = self._path(digest)
maruel1f7e8162015-09-16 10:35:43 -07001435 if not file_path.link_file(dest, path, file_path.HARDLINK_WITH_FALLBACK):
1436 # Report to the server that it failed with more details. We'll want to
1437 # squash them all.
1438 on_error.report('Failed to hardlink\n%s -> %s' % (path, dest))
1439
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001440 if file_mode is not None:
1441 # Ignores all other bits.
maruel12e30012015-10-09 11:55:35 -07001442 fs.chmod(dest, file_mode & 0500)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001443
1444 def _load(self):
1445 """Loads state of the cache from json file."""
1446 self._lock.assert_locked()
1447
1448 if not os.path.isdir(self.cache_dir):
maruel12e30012015-10-09 11:55:35 -07001449 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001450 else:
1451 # Make sure the cache is read-only.
1452 # TODO(maruel): Calculate the cost and optimize the performance
1453 # accordingly.
1454 file_path.make_tree_read_only(self.cache_dir)
1455
1456 # Load state of the cache.
maruel12e30012015-10-09 11:55:35 -07001457 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001458 try:
1459 self._lru = lru.LRUDict.load(self.state_file)
1460 except ValueError as err:
1461 logging.error('Failed to load cache state: %s' % (err,))
1462 # Don't want to keep broken state file.
1463 file_path.try_remove(self.state_file)
1464
1465 # Ensure that all files listed in the state still exist and add new ones.
1466 previous = self._lru.keys_set()
1467 unknown = []
maruel12e30012015-10-09 11:55:35 -07001468 for filename in fs.listdir(self.cache_dir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001469 if filename == self.STATE_FILE:
1470 continue
1471 if filename in previous:
1472 previous.remove(filename)
1473 continue
1474 # An untracked file.
1475 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1476 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001477 p = self._path(filename)
maruel12e30012015-10-09 11:55:35 -07001478 if fs.isdir(p):
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001479 try:
1480 file_path.rmtree(p)
1481 except OSError:
1482 pass
1483 else:
1484 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001485 continue
1486 # File that's not referenced in 'state.json'.
1487 # TODO(vadimsh): Verify its SHA1 matches file name.
1488 logging.warning('Adding unknown file %s to cache', filename)
1489 unknown.append(filename)
1490
1491 if unknown:
1492 # Add as oldest files. They will be deleted eventually if not accessed.
1493 self._add_oldest_list(unknown)
1494 logging.warning('Added back %d unknown files', len(unknown))
1495
1496 if previous:
1497 # Filter out entries that were not found.
1498 logging.warning('Removed %d lost files', len(previous))
1499 for filename in previous:
1500 self._lru.pop(filename)
1501 self._trim()
1502
1503 def _save(self):
1504 """Saves the LRU ordering."""
1505 self._lock.assert_locked()
1506 if sys.platform != 'win32':
1507 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001508 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001509 # Necessary otherwise the file can't be created.
1510 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001511 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001512 file_path.set_read_only(self.state_file, False)
1513 self._lru.save(self.state_file)
1514
1515 def _trim(self):
1516 """Trims anything we don't know, make sure enough free space exists."""
1517 self._lock.assert_locked()
1518
1519 # Ensure maximum cache size.
1520 if self.policies.max_cache_size:
1521 total_size = sum(self._lru.itervalues())
1522 while total_size > self.policies.max_cache_size:
1523 total_size -= self._remove_lru_file()
1524
1525 # Ensure maximum number of items in the cache.
1526 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1527 for _ in xrange(len(self._lru) - self.policies.max_items):
1528 self._remove_lru_file()
1529
1530 # Ensure enough free space.
1531 self._free_disk = file_path.get_free_space(self.cache_dir)
1532 trimmed_due_to_space = False
1533 while (
1534 self.policies.min_free_space and
1535 self._lru and
1536 self._free_disk < self.policies.min_free_space):
1537 trimmed_due_to_space = True
1538 self._remove_lru_file()
1539 self._free_disk = file_path.get_free_space(self.cache_dir)
1540 if trimmed_due_to_space:
1541 total_usage = sum(self._lru.itervalues())
1542 usage_percent = 0.
1543 if total_usage:
1544 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1545 logging.warning(
1546 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1547 'cache (%.1f%% of its maximum capacity)',
1548 self._free_disk / 1024.,
1549 total_usage / 1024.,
1550 usage_percent)
1551 self._save()
1552
1553 def _path(self, digest):
1554 """Returns the path to one item."""
1555 return os.path.join(self.cache_dir, digest)
1556
1557 def _remove_lru_file(self):
1558 """Removes the last recently used file and returns its size."""
1559 self._lock.assert_locked()
1560 digest, size = self._lru.pop_oldest()
1561 self._delete_file(digest, size)
1562 return size
1563
1564 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1565 """Adds an item into LRU cache marking it as a newest one."""
1566 self._lock.assert_locked()
1567 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001568 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001569 self._added.append(size)
1570 self._lru.add(digest, size)
1571
1572 def _add_oldest_list(self, digests):
1573 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1574 self._lock.assert_locked()
1575 pairs = []
1576 for digest in digests:
maruel12e30012015-10-09 11:55:35 -07001577 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001578 self._added.append(size)
1579 pairs.append((digest, size))
1580 self._lru.batch_insert_oldest(pairs)
1581
1582 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1583 """Deletes cache file from the file system."""
1584 self._lock.assert_locked()
1585 try:
1586 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001587 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001588 file_path.try_remove(self._path(digest))
1589 self._removed.append(size)
1590 except OSError as e:
1591 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1592
1593
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001594class IsolatedBundle(object):
1595 """Fetched and parsed .isolated file with all dependencies."""
1596
Vadim Shtayura3148e072014-09-02 18:51:52 -07001597 def __init__(self):
1598 self.command = []
1599 self.files = {}
1600 self.read_only = None
1601 self.relative_cwd = None
1602 # The main .isolated file, a IsolatedFile instance.
1603 self.root = None
1604
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001605 def fetch(self, fetch_queue, root_isolated_hash, algo):
1606 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001607
1608 It enables support for "included" .isolated files. They are processed in
1609 strict order but fetched asynchronously from the cache. This is important so
1610 that a file in an included .isolated file that is overridden by an embedding
1611 .isolated file is not fetched needlessly. The includes are fetched in one
1612 pass and the files are fetched as soon as all the ones on the left-side
1613 of the tree were fetched.
1614
1615 The prioritization is very important here for nested .isolated files.
1616 'includes' have the highest priority and the algorithm is optimized for both
1617 deep and wide trees. A deep one is a long link of .isolated files referenced
1618 one at a time by one item in 'includes'. A wide one has a large number of
1619 'includes' in a single .isolated file. 'left' is defined as an included
1620 .isolated file earlier in the 'includes' list. So the order of the elements
1621 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001622
1623 As a side effect this method starts asynchronous fetch of all data files
1624 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1625 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001626 """
1627 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1628
1629 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1630 pending = {}
1631 # Set of hashes of already retrieved items to refuse recursive includes.
1632 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001633 # Set of IsolatedFile's whose data files have already being fetched.
1634 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001635
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001636 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001637 h = isolated_file.obj_hash
1638 if h in seen:
1639 raise isolated_format.IsolatedError(
1640 'IsolatedFile %s is retrieved recursively' % h)
1641 assert h not in pending
1642 seen.add(h)
1643 pending[h] = isolated_file
1644 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1645
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001646 # Start fetching root *.isolated file (single file, not the whole bundle).
1647 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001648
1649 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001650 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001651 item_hash = fetch_queue.wait(pending)
1652 item = pending.pop(item_hash)
1653 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001654
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001655 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001656 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001657 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001658
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001659 # Always fetch *.isolated files in traversal order, waiting if necessary
1660 # until next to-be-processed node loads. "Waiting" is done by yielding
1661 # back to the outer loop, that waits until some *.isolated is loaded.
1662 for node in isolated_format.walk_includes(self.root):
1663 if node not in processed:
1664 # Not visited, and not yet loaded -> wait for it to load.
1665 if not node.is_loaded:
1666 break
1667 # Not visited and loaded -> process it and continue the traversal.
1668 self._start_fetching_files(node, fetch_queue)
1669 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001670
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001671 # All *.isolated files should be processed by now and only them.
1672 all_isolateds = set(isolated_format.walk_includes(self.root))
1673 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001674
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001675 # Extract 'command' and other bundle properties.
1676 for node in isolated_format.walk_includes(self.root):
1677 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001678 self.relative_cwd = self.relative_cwd or ''
1679
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001680 def _start_fetching_files(self, isolated, fetch_queue):
1681 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001682
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001683 Modifies self.files.
1684 """
1685 logging.debug('fetch_files(%s)', isolated.obj_hash)
1686 for filepath, properties in isolated.data.get('files', {}).iteritems():
1687 # Root isolated has priority on the files being mapped. In particular,
1688 # overridden files must not be fetched.
1689 if filepath not in self.files:
1690 self.files[filepath] = properties
1691 if 'h' in properties:
1692 # Preemptively request files.
1693 logging.debug('fetching %s', filepath)
1694 fetch_queue.add(
1695 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1696
1697 def _update_self(self, node):
1698 """Extracts bundle global parameters from loaded *.isolated file.
1699
1700 Will be called with each loaded *.isolated file in order of traversal of
1701 isolated include graph (see isolated_format.walk_includes).
1702 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001703 # Grabs properties.
1704 if not self.command and node.data.get('command'):
1705 # Ensure paths are correctly separated on windows.
1706 self.command = node.data['command']
1707 if self.command:
1708 self.command[0] = self.command[0].replace('/', os.path.sep)
1709 self.command = tools.fix_python_path(self.command)
1710 if self.read_only is None and node.data.get('read_only') is not None:
1711 self.read_only = node.data['read_only']
1712 if (self.relative_cwd is None and
1713 node.data.get('relative_cwd') is not None):
1714 self.relative_cwd = node.data['relative_cwd']
1715
1716
Vadim Shtayura8623c272014-12-01 11:45:27 -08001717def set_storage_api_class(cls):
1718 """Replaces StorageApi implementation used by default."""
1719 global _storage_api_cls
1720 assert _storage_api_cls is None
1721 assert issubclass(cls, StorageApi)
1722 _storage_api_cls = cls
1723
1724
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001725def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001726 """Returns an object that implements low-level StorageApi interface.
1727
1728 It is used by Storage to work with single isolate |namespace|. It should
1729 rarely be used directly by clients, see 'get_storage' for
1730 a better alternative.
1731
1732 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001733 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001734 namespace: isolate namespace to operate in, also defines hashing and
1735 compression scheme used, i.e. namespace names that end with '-gzip'
1736 store compressed data.
1737
1738 Returns:
1739 Instance of StorageApi subclass.
1740 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001741 cls = _storage_api_cls or IsolateServer
1742 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001743
1744
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001745def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001746 """Returns Storage class that can upload and download from |namespace|.
1747
1748 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001749 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001750 namespace: isolate namespace to operate in, also defines hashing and
1751 compression scheme used, i.e. namespace names that end with '-gzip'
1752 store compressed data.
1753
1754 Returns:
1755 Instance of Storage.
1756 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001757 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001758
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001759
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001760def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001761 """Uploads the given tree to the given url.
1762
1763 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001764 base_url: The url of the isolate server to upload to.
1765 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001766 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001767 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001768 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001769 # Filter out symlinks, since they are not represented by items on isolate
1770 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001771 items = []
1772 seen = set()
1773 skipped = 0
1774 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001775 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001776 if 'l' not in metadata and filepath not in seen:
1777 seen.add(filepath)
1778 item = FileItem(
1779 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001780 digest=metadata['h'],
1781 size=metadata['s'],
1782 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001783 items.append(item)
1784 else:
1785 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001786
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001787 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001788 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001789 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001790
1791
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001792def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001793 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001794
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001795 Arguments:
1796 isolated_hash: hash of the root *.isolated file.
1797 storage: Storage class that communicates with isolate storage.
1798 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001799 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001800 require_command: Ensure *.isolated specifies a command to run.
1801
1802 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001803 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001804 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001805 logging.debug(
1806 'fetch_isolated(%s, %s, %s, %s, %s)',
1807 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001808 # Hash algorithm to use, defined by namespace |storage| is using.
1809 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001810 with cache:
1811 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001812 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001813
1814 with tools.Profiler('GetIsolateds'):
1815 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001816 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001817 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001818 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001819 try:
maruel1ceb3872015-10-14 06:10:44 -07001820 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001821 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001822 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001823 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1824 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001825
1826 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001827 bundle.fetch(fetch_queue, isolated_hash, algo)
1828 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001829 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1830 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001831 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001832
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001833 with tools.Profiler('GetRest'):
1834 # Create file system hierarchy.
maruel12e30012015-10-09 11:55:35 -07001835 if not fs.isdir(outdir):
1836 fs.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001837 create_directories(outdir, bundle.files)
1838 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001839
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001840 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001841 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
maruel12e30012015-10-09 11:55:35 -07001842 if not fs.isdir(cwd):
1843 fs.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001844
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001845 # Multimap: digest -> list of pairs (path, props).
1846 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001847 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001848 if 'h' in props:
1849 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001850
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001851 # Now block on the remaining files to be downloaded and mapped.
1852 logging.info('Retrieving remaining files (%d of them)...',
1853 fetch_queue.pending_count)
1854 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001855 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001856 while remaining:
1857 detector.ping()
1858
1859 # Wait for any item to finish fetching to cache.
1860 digest = fetch_queue.wait(remaining)
1861
1862 # Link corresponding files to a fetched item in cache.
1863 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001864 cache.hardlink(
1865 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001866
1867 # Report progress.
1868 duration = time.time() - last_update
1869 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1870 msg = '%d files remaining...' % len(remaining)
1871 print msg
1872 logging.info(msg)
1873 last_update = time.time()
1874
1875 # Cache could evict some items we just tried to fetch, it's a fatal error.
1876 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001877 raise isolated_format.MappingError(
1878 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001879 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001880
1881
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001882def directory_to_metadata(root, algo, blacklist):
1883 """Returns the FileItem list and .isolated metadata for a directory."""
1884 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001885 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001886 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001887 metadata = {
1888 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001889 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001890 for relpath in paths
1891 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001892 for v in metadata.itervalues():
1893 v.pop('t')
1894 items = [
1895 FileItem(
1896 path=os.path.join(root, relpath),
1897 digest=meta['h'],
1898 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001899 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001900 for relpath, meta in metadata.iteritems() if 'h' in meta
1901 ]
1902 return items, metadata
1903
1904
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001905def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001906 """Stores every entries and returns the relevant data.
1907
1908 Arguments:
1909 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001910 files: list of file paths to upload. If a directory is specified, a
1911 .isolated file is created and its hash is returned.
1912 blacklist: function that returns True if a file should be omitted.
1913 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001914 assert all(isinstance(i, unicode) for i in files), files
1915 if len(files) != len(set(map(os.path.abspath, files))):
1916 raise Error('Duplicate entries found.')
1917
1918 results = []
1919 # The temporary directory is only created as needed.
1920 tempdir = None
1921 try:
1922 # TODO(maruel): Yield the files to a worker thread.
1923 items_to_upload = []
1924 for f in files:
1925 try:
1926 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001927 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001928 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001929 items, metadata = directory_to_metadata(
1930 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001931
1932 # Create the .isolated file.
1933 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001934 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1935 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001936 os.close(handle)
1937 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001938 'algo':
1939 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001940 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001941 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001942 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001943 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001944 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001945 items_to_upload.extend(items)
1946 items_to_upload.append(
1947 FileItem(
1948 path=isolated,
1949 digest=h,
maruel12e30012015-10-09 11:55:35 -07001950 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001951 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001952 results.append((h, f))
1953
maruel12e30012015-10-09 11:55:35 -07001954 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001955 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001956 items_to_upload.append(
1957 FileItem(
1958 path=filepath,
1959 digest=h,
maruel12e30012015-10-09 11:55:35 -07001960 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001961 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001962 results.append((h, f))
1963 else:
1964 raise Error('%s is neither a file or directory.' % f)
1965 except OSError:
1966 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001967 # Technically we would care about which files were uploaded but we don't
1968 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001969 _uploaded_files = storage.upload_items(items_to_upload)
1970 return results
1971 finally:
maruel12e30012015-10-09 11:55:35 -07001972 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001973 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001974
1975
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001976def archive(out, namespace, files, blacklist):
1977 if files == ['-']:
1978 files = sys.stdin.readlines()
1979
1980 if not files:
1981 raise Error('Nothing to upload')
1982
1983 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001984 blacklist = tools.gen_blacklist(blacklist)
1985 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001986 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001987 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1988
1989
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001990@subcommand.usage('<file1..fileN> or - to read from stdin')
1991def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001992 """Archives data to the server.
1993
1994 If a directory is specified, a .isolated file is created the whole directory
1995 is uploaded. Then this .isolated file can be included in another one to run
1996 commands.
1997
1998 The commands output each file that was processed with its content hash. For
1999 directories, the .isolated generated for the directory is listed as the
2000 directory entry itself.
2001 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002002 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002003 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002004 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002005 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002006 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002007 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002008 except Error as e:
2009 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002010 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002011
2012
2013def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002014 """Download data from the server.
2015
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002016 It can either download individual files or a complete tree from a .isolated
2017 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002018 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002019 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002020 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002021 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002022 help='hash of an isolated file, .isolated file content is discarded, use '
2023 '--file if you need it')
2024 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002025 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2026 help='hash and destination of a file, can be used multiple times')
2027 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002028 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002029 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002030 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002031 options, args = parser.parse_args(args)
2032 if args:
2033 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002034
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002035 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002036 if bool(options.isolated) == bool(options.file):
2037 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002038
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002039 cache = process_cache_options(options)
maruel12e30012015-10-09 11:55:35 -07002040 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002041 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002042 if (fs.isfile(options.target) or
2043 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002044 parser.error(
2045 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002046 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002047 # Fetching individual files.
2048 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002049 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002050 channel = threading_utils.TaskChannel()
2051 pending = {}
2052 for digest, dest in options.file:
2053 pending[digest] = dest
2054 storage.async_fetch(
2055 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002056 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002057 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002058 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002059 functools.partial(file_write, os.path.join(options.target, dest)))
2060 while pending:
2061 fetched = channel.pull()
2062 dest = pending.pop(fetched)
2063 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002064
Vadim Shtayura3172be52013-12-03 12:49:05 -08002065 # Fetching whole isolated tree.
2066 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002067 with cache:
2068 bundle = fetch_isolated(
2069 isolated_hash=options.isolated,
2070 storage=storage,
2071 cache=cache,
2072 outdir=options.target,
2073 require_command=False)
2074 if bundle.command:
2075 rel = os.path.join(options.target, bundle.relative_cwd)
2076 print('To run this test please run from the directory %s:' %
2077 os.path.join(options.target, rel))
2078 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002079
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002080 return 0
2081
2082
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002083def add_archive_options(parser):
2084 parser.add_option(
2085 '--blacklist',
2086 action='append', default=list(DEFAULT_BLACKLIST),
2087 help='List of regexp to use as blacklist filter when uploading '
2088 'directories')
2089
2090
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002091def add_isolate_server_options(parser):
2092 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002093 parser.add_option(
2094 '-I', '--isolate-server',
2095 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002096 help='URL of the Isolate Server to use. Defaults to the environment '
2097 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2098 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002099 parser.add_option(
2100 '--namespace', default='default-gzip',
2101 help='The namespace to use on the Isolate Server, default: %default')
2102
2103
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002104def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002105 """Processes the --isolate-server option and aborts if not specified.
2106
2107 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002108 """
2109 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002110 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002111 try:
2112 options.isolate_server = net.fix_url(options.isolate_server)
2113 except ValueError as e:
2114 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002115 if set_exception_handler:
2116 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002117 try:
2118 return auth.ensure_logged_in(options.isolate_server)
2119 except ValueError as e:
2120 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002121
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002122
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002123def add_cache_options(parser):
2124 cache_group = optparse.OptionGroup(parser, 'Cache management')
2125 cache_group.add_option(
2126 '--cache', metavar='DIR',
2127 help='Directory to keep a local cache of the files. Accelerates download '
2128 'by reusing already downloaded files. Default=%default')
2129 cache_group.add_option(
2130 '--max-cache-size',
2131 type='int',
2132 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002133 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002134 help='Trim if the cache gets larger than this value, default=%default')
2135 cache_group.add_option(
2136 '--min-free-space',
2137 type='int',
2138 metavar='NNN',
2139 default=2*1024*1024*1024,
2140 help='Trim if disk free space becomes lower than this value, '
2141 'default=%default')
2142 cache_group.add_option(
2143 '--max-items',
2144 type='int',
2145 metavar='NNN',
2146 default=100000,
2147 help='Trim if more than this number of items are in the cache '
2148 'default=%default')
2149 parser.add_option_group(cache_group)
2150
2151
2152def process_cache_options(options):
2153 if options.cache:
2154 policies = CachePolicies(
2155 options.max_cache_size, options.min_free_space, options.max_items)
2156
2157 # |options.cache| path may not exist until DiskCache() instance is created.
2158 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002159 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002160 policies,
2161 isolated_format.get_hash_algo(options.namespace))
2162 else:
2163 return MemoryCache()
2164
2165
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002166class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002167 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002168 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002169 self,
2170 version=__version__,
2171 prog=os.path.basename(sys.modules[__name__].__file__),
2172 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002173 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002174
2175 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002176 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002177 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002178 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002179 return options, args
2180
2181
2182def main(args):
2183 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002184 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002185
2186
2187if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002188 fix_encoding.fix_encoding()
2189 tools.disable_buffering()
2190 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002191 sys.exit(main(sys.argv[1:]))