blob: 4d93db0889d8ed121399ea62fd0870b940f7be03 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04008__version__ = '0.3.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000013import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050014import shutil
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000019import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050020import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000021import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000023from third_party import colorama
24from third_party.depot_tools import fix_encoding
25from third_party.depot_tools import subcommand
26
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050027from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000028from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040029from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000030from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000031from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000032
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080033import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040034import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000036
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000037# Version of isolate protocol passed to the server in /handshake request.
38ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000040
41# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000042# All files are sorted by likelihood of a change in the file content
43# (currently file size is used to estimate this: larger the file -> larger the
44# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000046# and so on. Numbers here is a trade-off; the more per request, the lower the
47# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
48# larger values cause longer lookups, increasing the initial latency to start
49# uploading, which is especially an issue for large files. This value is
50# optimized for the "few thousands files to look up with minimal number of large
51# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040052ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000053
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000054
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000055# A list of already compressed extension types that should not receive any
56# compression before being uploaded.
57ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040058 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
59 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000060]
61
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000062
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000063# Chunk size to use when reading from network stream.
64NET_IO_FILE_CHUNK = 16 * 1024
65
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000066
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000067# Read timeout in seconds for downloads from isolate storage. If there's no
68# response from the server within this timeout whole download will be aborted.
69DOWNLOAD_READ_TIMEOUT = 60
70
71
maruel@chromium.org41601642013-09-18 19:40:46 +000072# The delay (in seconds) to wait between logging statements when retrieving
73# the required files. This is intended to let the user (or buildbot) know that
74# the program is still running.
75DELAY_BETWEEN_UPDATES_IN_SECS = 30
76
77
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050078DEFAULT_BLACKLIST = (
79 # Temporary vim or python files.
80 r'^.+\.(?:pyc|swp)$',
81 # .git or .svn directory.
82 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
83)
84
85
86# Chromium-specific.
87DEFAULT_BLACKLIST += (
88 r'^.+\.(?:run_test_cases)$',
89 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
90)
91
92
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050093class Error(Exception):
94 """Generic runtime error."""
95 pass
96
97
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000098def stream_read(stream, chunk_size):
99 """Reads chunks from |stream| and yields them."""
100 while True:
101 data = stream.read(chunk_size)
102 if not data:
103 break
104 yield data
105
106
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400107def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800108 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000109 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800110 if offset:
111 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000112 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000113 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000114 if not data:
115 break
116 yield data
117
118
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000119def file_write(filepath, content_generator):
120 """Writes file content as generated by content_generator.
121
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000122 Creates the intermediary directory as needed.
123
124 Returns the number of bytes written.
125
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000126 Meant to be mocked out in unit tests.
127 """
128 filedir = os.path.dirname(filepath)
129 if not os.path.isdir(filedir):
130 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000131 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000132 with open(filepath, 'wb') as f:
133 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000134 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000135 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000136 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137
138
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000139def zip_compress(content_generator, level=7):
140 """Reads chunks from |content_generator| and yields zip compressed chunks."""
141 compressor = zlib.compressobj(level)
142 for chunk in content_generator:
143 compressed = compressor.compress(chunk)
144 if compressed:
145 yield compressed
146 tail = compressor.flush(zlib.Z_FINISH)
147 if tail:
148 yield tail
149
150
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400151def zip_decompress(
152 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000153 """Reads zipped data from |content_generator| and yields decompressed data.
154
155 Decompresses data in small chunks (no larger than |chunk_size|) so that
156 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
157
158 Raises IOError if data is corrupted or incomplete.
159 """
160 decompressor = zlib.decompressobj()
161 compressed_size = 0
162 try:
163 for chunk in content_generator:
164 compressed_size += len(chunk)
165 data = decompressor.decompress(chunk, chunk_size)
166 if data:
167 yield data
168 while decompressor.unconsumed_tail:
169 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
170 if data:
171 yield data
172 tail = decompressor.flush()
173 if tail:
174 yield tail
175 except zlib.error as e:
176 raise IOError(
177 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
178 # Ensure all data was read and decompressed.
179 if decompressor.unused_data or decompressor.unconsumed_tail:
180 raise IOError('Not all data was decompressed')
181
182
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000183def get_zip_compression_level(filename):
184 """Given a filename calculates the ideal zip compression level to use."""
185 file_ext = os.path.splitext(filename)[1].lower()
186 # TODO(csharp): Profile to find what compression level works best.
187 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
188
189
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000190def create_directories(base_directory, files):
191 """Creates the directory structure needed by the given list of files."""
192 logging.debug('create_directories(%s, %d)', base_directory, len(files))
193 # Creates the tree of directories to create.
194 directories = set(os.path.dirname(f) for f in files)
195 for item in list(directories):
196 while item:
197 directories.add(item)
198 item = os.path.dirname(item)
199 for d in sorted(directories):
200 if d:
201 os.mkdir(os.path.join(base_directory, d))
202
203
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500204def create_symlinks(base_directory, files):
205 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000206 for filepath, properties in files:
207 if 'l' not in properties:
208 continue
209 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500210 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000211 logging.warning('Ignoring symlink %s', filepath)
212 continue
213 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500214 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000215 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000216
217
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000218def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000219 """Determines if the given files appears valid.
220
221 Currently it just checks the file's size.
222 """
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400223 if size == isolated_format.UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000224 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000225 actual_size = os.stat(filepath).st_size
226 if size != actual_size:
227 logging.warning(
228 'Found invalid item %s; %d != %d',
229 os.path.basename(filepath), actual_size, size)
230 return False
231 return True
232
233
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000234class Item(object):
235 """An item to push to Storage.
236
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800237 Its digest and size may be provided in advance, if known. Otherwise they will
238 be derived from content(). If digest is provided, it MUST correspond to
239 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000240
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800241 When used with Storage, Item starts its life in a main thread, travels
242 to 'contains' thread, then to 'push' thread and then finally back to
243 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000244 """
245
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800246 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000247 self.digest = digest
248 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800249 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000250 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800252 def content(self):
253 """Iterable with content of this item as byte string (str) chunks."""
254 raise NotImplementedError()
255
256 def prepare(self, hash_algo):
257 """Ensures self.digest and self.size are set.
258
259 Uses content() as a source of data to calculate them. Does nothing if digest
260 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000261
262 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000264 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800265 if self.digest is None or self.size is None:
266 digest = hash_algo()
267 total = 0
268 for chunk in self.content():
269 digest.update(chunk)
270 total += len(chunk)
271 self.digest = digest.hexdigest()
272 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000273
274
275class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800276 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000277
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800278 Its digest and size may be provided in advance, if known. Otherwise they will
279 be derived from the file content.
280 """
281
282 def __init__(self, path, digest=None, size=None, high_priority=False):
283 super(FileItem, self).__init__(
284 digest,
285 size if size is not None else os.stat(path).st_size,
286 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000287 self.path = path
288 self.compression_level = get_zip_compression_level(path)
289
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800290 def content(self):
291 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000292
293
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000294class BufferItem(Item):
295 """A byte buffer to push to Storage."""
296
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800297 def __init__(self, buf, high_priority=False):
298 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000299 self.buffer = buf
300
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800301 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000302 return [self.buffer]
303
304
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000305class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800306 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000307
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800308 Implements compression support, parallel 'contains' checks, parallel uploads
309 and more.
310
311 Works only within single namespace (and thus hashing algorithm and compression
312 scheme are fixed).
313
314 Spawns multiple internal threads. Thread safe, but not fork safe.
315 """
316
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700317 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000318 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400319 self._use_zip = isolated_format.is_namespace_with_compression(
320 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400321 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000322 self._cpu_thread_pool = None
323 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000324
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000325 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700326 def hash_algo(self):
327 """Hashing algorithm used to name files in storage based on their content.
328
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400329 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700330 """
331 return self._hash_algo
332
333 @property
334 def location(self):
335 """Location of a backing store that this class is using.
336
337 Exact meaning depends on the storage_api type. For IsolateServer it is
338 an URL of isolate server, for FileSystem is it a path in file system.
339 """
340 return self._storage_api.location
341
342 @property
343 def namespace(self):
344 """Isolate namespace used by this storage.
345
346 Indirectly defines hashing scheme and compression method used.
347 """
348 return self._storage_api.namespace
349
350 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000351 def cpu_thread_pool(self):
352 """ThreadPool for CPU-bound tasks like zipping."""
353 if self._cpu_thread_pool is None:
354 self._cpu_thread_pool = threading_utils.ThreadPool(
355 2, max(threading_utils.num_processors(), 2), 0, 'zip')
356 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000357
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000358 @property
359 def net_thread_pool(self):
360 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
361 if self._net_thread_pool is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400362 self._net_thread_pool = isolated_format.WorkerPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000363 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000364
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000365 def close(self):
366 """Waits for all pending tasks to finish."""
367 if self._cpu_thread_pool:
368 self._cpu_thread_pool.join()
369 self._cpu_thread_pool.close()
370 self._cpu_thread_pool = None
371 if self._net_thread_pool:
372 self._net_thread_pool.join()
373 self._net_thread_pool.close()
374 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000375
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000376 def __enter__(self):
377 """Context manager interface."""
378 return self
379
380 def __exit__(self, _exc_type, _exc_value, _traceback):
381 """Context manager interface."""
382 self.close()
383 return False
384
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000385 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800386 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000387
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800388 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000389
390 Arguments:
391 items: list of Item instances that represents data to upload.
392
393 Returns:
394 List of items that were uploaded. All other items are already there.
395 """
396 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
397 # used by swarming.py. There's no need to spawn multiple threads and try to
398 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
399 # 'push' should be performed sequentially in the context of current thread.
400
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800401 # Ensure all digests are calculated.
402 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700403 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800404
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000405 # For each digest keep only first Item that matches it. All other items
406 # are just indistinguishable copies from the point of view of isolate
407 # server (it doesn't care about paths at all, only content and digests).
408 seen = {}
409 duplicates = 0
410 for item in items:
411 if seen.setdefault(item.digest, item) is not item:
412 duplicates += 1
413 items = seen.values()
414 if duplicates:
415 logging.info('Skipped %d duplicated files', duplicates)
416
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000417 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000418 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000419 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800420 channel = threading_utils.TaskChannel()
421 for missing_item, push_state in self.get_missing_items(items):
422 missing.add(missing_item)
423 self.async_push(channel, missing_item, push_state)
424
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000425 # No need to spawn deadlock detector thread if there's nothing to upload.
426 if missing:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400427 with threading_utils.DeadlockDetector(
428 isolated_format.DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000429 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000430 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000431 detector.ping()
432 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000433 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000434 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000435 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000436 logging.info('All files are uploaded')
437
438 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000439 total = len(items)
440 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000441 logging.info(
442 'Total: %6d, %9.1fkb',
443 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000444 total_size / 1024.)
445 cache_hit = set(items) - missing
446 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000447 logging.info(
448 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
449 len(cache_hit),
450 cache_hit_size / 1024.,
451 len(cache_hit) * 100. / total,
452 cache_hit_size * 100. / total_size if total_size else 0)
453 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000454 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000455 logging.info(
456 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
457 len(cache_miss),
458 cache_miss_size / 1024.,
459 len(cache_miss) * 100. / total,
460 cache_miss_size * 100. / total_size if total_size else 0)
461
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000462 return uploaded
463
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800464 def get_fetch_url(self, item):
465 """Returns an URL that can be used to fetch given item once it's uploaded.
466
467 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000468
469 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800470 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000471
472 Returns:
473 An URL or None if underlying protocol doesn't support this.
474 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700475 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800476 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000477
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800478 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000479 """Starts asynchronous push to the server in a parallel thread.
480
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800481 Can be used only after |item| was checked for presence on a server with
482 'get_missing_items' call. 'get_missing_items' returns |push_state| object
483 that contains storage specific information describing how to upload
484 the item (for example in case of cloud storage, it is signed upload URLs).
485
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000486 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000487 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000488 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800489 push_state: push state returned by 'get_missing_items' call for |item|.
490
491 Returns:
492 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000493 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800494 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400495 priority = (
496 isolated_format.WorkerPool.HIGH if item.high_priority
497 else isolated_format.WorkerPool.MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000499 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400500 """Pushes an Item and returns it to |channel|."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700501 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800502 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000503 return item
504
505 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700506 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800507 self.net_thread_pool.add_task_with_channel(
508 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000509 return
510
511 # If zipping is enabled, zip in a separate thread.
512 def zip_and_push():
513 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
514 # content right here. It will block until all file is zipped.
515 try:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800516 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000517 data = ''.join(stream)
518 except Exception as exc:
519 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800520 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000521 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000522 self.net_thread_pool.add_task_with_channel(
523 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000524 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000525
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800526 def push(self, item, push_state):
527 """Synchronously pushes a single item to the server.
528
529 If you need to push many items at once, consider using 'upload_items' or
530 'async_push' with instance of TaskChannel.
531
532 Arguments:
533 item: item to upload as instance of Item class.
534 push_state: push state returned by 'get_missing_items' call for |item|.
535
536 Returns:
537 Pushed item (same object as |item|).
538 """
539 channel = threading_utils.TaskChannel()
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400540 with threading_utils.DeadlockDetector(isolated_format.DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800541 self.async_push(channel, item, push_state)
542 pushed = channel.pull()
543 assert pushed is item
544 return item
545
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000546 def async_fetch(self, channel, priority, digest, size, sink):
547 """Starts asynchronous fetch from the server in a parallel thread.
548
549 Arguments:
550 channel: TaskChannel that receives back |digest| when download ends.
551 priority: thread pool task priority for the fetch.
552 digest: hex digest of an item to download.
553 size: expected size of the item (after decompression).
554 sink: function that will be called as sink(generator).
555 """
556 def fetch():
557 try:
558 # Prepare reading pipeline.
559 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700560 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400561 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000562 # Run |stream| through verifier that will assert its size.
563 verifier = FetchStreamVerifier(stream, size)
564 # Verified stream goes to |sink|.
565 sink(verifier.run())
566 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800567 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000568 raise
569 return digest
570
571 # Don't bother with zip_thread_pool for decompression. Decompression is
572 # really fast and most probably IO bound anyway.
573 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
574
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000575 def get_missing_items(self, items):
576 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000577
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000578 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000579
580 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000581 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000582
583 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800584 For each missing item it yields a pair (item, push_state), where:
585 * item - Item object that is missing (one of |items|).
586 * push_state - opaque object that contains storage specific information
587 describing how to upload the item (for example in case of cloud
588 storage, it is signed upload URLs). It can later be passed to
589 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000590 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000591 channel = threading_utils.TaskChannel()
592 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800593
594 # Ensure all digests are calculated.
595 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700596 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800597
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000598 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800599 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400600 self.net_thread_pool.add_task_with_channel(
601 channel, isolated_format.WorkerPool.HIGH,
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000602 self._storage_api.contains, batch)
603 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800604
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 # Yield results as they come in.
606 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800607 for missing_item, push_state in channel.pull().iteritems():
608 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000609
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000610
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800611def batch_items_for_check(items):
612 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 Each batch corresponds to a single 'exists?' query to the server via a call
615 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800617 Arguments:
618 items: a list of Item objects.
619
620 Yields:
621 Batches of items to query for existence in a single operation,
622 each batch is a list of Item objects.
623 """
624 batch_count = 0
625 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
626 next_queries = []
627 for item in sorted(items, key=lambda x: x.size, reverse=True):
628 next_queries.append(item)
629 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000630 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631 next_queries = []
632 batch_count += 1
633 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
634 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
635 if next_queries:
636 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000637
638
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000639class FetchQueue(object):
640 """Fetches items from Storage and places them into LocalCache.
641
642 It manages multiple concurrent fetch operations. Acts as a bridge between
643 Storage and LocalCache so that Storage and LocalCache don't depend on each
644 other at all.
645 """
646
647 def __init__(self, storage, cache):
648 self.storage = storage
649 self.cache = cache
650 self._channel = threading_utils.TaskChannel()
651 self._pending = set()
652 self._accessed = set()
653 self._fetched = cache.cached_set()
654
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400655 def add(
656 self, digest, size=isolated_format.UNKNOWN_FILE_SIZE,
657 priority=isolated_format.WorkerPool.MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000658 """Starts asynchronous fetch of item |digest|."""
659 # Fetching it now?
660 if digest in self._pending:
661 return
662
663 # Mark this file as in use, verify_all_cached will later ensure it is still
664 # in cache.
665 self._accessed.add(digest)
666
667 # Already fetched? Notify cache to update item's LRU position.
668 if digest in self._fetched:
669 # 'touch' returns True if item is in cache and not corrupted.
670 if self.cache.touch(digest, size):
671 return
672 # Item is corrupted, remove it from cache and fetch it again.
673 self._fetched.remove(digest)
674 self.cache.evict(digest)
675
676 # TODO(maruel): It should look at the free disk space, the current cache
677 # size and the size of the new item on every new item:
678 # - Trim the cache as more entries are listed when free disk space is low,
679 # otherwise if the amount of data downloaded during the run > free disk
680 # space, it'll crash.
681 # - Make sure there's enough free disk space to fit all dependencies of
682 # this run! If not, abort early.
683
684 # Start fetching.
685 self._pending.add(digest)
686 self.storage.async_fetch(
687 self._channel, priority, digest, size,
688 functools.partial(self.cache.write, digest))
689
690 def wait(self, digests):
691 """Starts a loop that waits for at least one of |digests| to be retrieved.
692
693 Returns the first digest retrieved.
694 """
695 # Flush any already fetched items.
696 for digest in digests:
697 if digest in self._fetched:
698 return digest
699
700 # Ensure all requested items are being fetched now.
701 assert all(digest in self._pending for digest in digests), (
702 digests, self._pending)
703
704 # Wait for some requested item to finish fetching.
705 while self._pending:
706 digest = self._channel.pull()
707 self._pending.remove(digest)
708 self._fetched.add(digest)
709 if digest in digests:
710 return digest
711
712 # Should never reach this point due to assert above.
713 raise RuntimeError('Impossible state')
714
715 def inject_local_file(self, path, algo):
716 """Adds local file to the cache as if it was fetched from storage."""
717 with open(path, 'rb') as f:
718 data = f.read()
719 digest = algo(data).hexdigest()
720 self.cache.write(digest, [data])
721 self._fetched.add(digest)
722 return digest
723
724 @property
725 def pending_count(self):
726 """Returns number of items to be fetched."""
727 return len(self._pending)
728
729 def verify_all_cached(self):
730 """True if all accessed items are in cache."""
731 return self._accessed.issubset(self.cache.cached_set())
732
733
734class FetchStreamVerifier(object):
735 """Verifies that fetched file is valid before passing it to the LocalCache."""
736
737 def __init__(self, stream, expected_size):
738 self.stream = stream
739 self.expected_size = expected_size
740 self.current_size = 0
741
742 def run(self):
743 """Generator that yields same items as |stream|.
744
745 Verifies |stream| is complete before yielding a last chunk to consumer.
746
747 Also wraps IOError produced by consumer into MappingError exceptions since
748 otherwise Storage will retry fetch on unrelated local cache errors.
749 """
750 # Read one chunk ahead, keep it in |stored|.
751 # That way a complete stream can be verified before pushing last chunk
752 # to consumer.
753 stored = None
754 for chunk in self.stream:
755 assert chunk is not None
756 if stored is not None:
757 self._inspect_chunk(stored, is_last=False)
758 try:
759 yield stored
760 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400761 raise isolated_format.MappingError(
762 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000763 stored = chunk
764 if stored is not None:
765 self._inspect_chunk(stored, is_last=True)
766 try:
767 yield stored
768 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400769 raise isolated_format.MappingError(
770 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000771
772 def _inspect_chunk(self, chunk, is_last):
773 """Called for each fetched chunk before passing it to consumer."""
774 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400775 if (is_last and
776 (self.expected_size != isolated_format.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000777 (self.expected_size != self.current_size)):
778 raise IOError('Incorrect file size: expected %d, got %d' % (
779 self.expected_size, self.current_size))
780
781
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000782class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800783 """Interface for classes that implement low-level storage operations.
784
785 StorageApi is oblivious of compression and hashing scheme used. This details
786 are handled in higher level Storage class.
787
788 Clients should generally not use StorageApi directly. Storage class is
789 preferred since it implements compression and upload optimizations.
790 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000791
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700792 @property
793 def location(self):
794 """Location of a backing store that this class is using.
795
796 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
797 server, for FileSystem is it a path in file system.
798 """
799 raise NotImplementedError()
800
801 @property
802 def namespace(self):
803 """Isolate namespace used by this storage.
804
805 Indirectly defines hashing scheme and compression method used.
806 """
807 raise NotImplementedError()
808
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000809 def get_fetch_url(self, digest):
810 """Returns an URL that can be used to fetch an item with given digest.
811
812 Arguments:
813 digest: hex digest of item to fetch.
814
815 Returns:
816 An URL or None if the protocol doesn't support this.
817 """
818 raise NotImplementedError()
819
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800820 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000821 """Fetches an object and yields its content.
822
823 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000824 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800825 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000826
827 Yields:
828 Chunks of downloaded item (as str objects).
829 """
830 raise NotImplementedError()
831
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800832 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000833 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000834
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800835 |item| MUST go through 'contains' call to get |push_state| before it can
836 be pushed to the storage.
837
838 To be clear, here is one possible usage:
839 all_items = [... all items to push as Item subclasses ...]
840 for missing_item, push_state in storage_api.contains(all_items).items():
841 storage_api.push(missing_item, push_state)
842
843 When pushing to a namespace with compression, data that should be pushed
844 and data provided by the item is not the same. In that case |content| is
845 not None and it yields chunks of compressed data (using item.content() as
846 a source of original uncompressed data). This is implemented by Storage
847 class.
848
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000849 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000850 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800851 push_state: push state object as returned by 'contains' call.
852 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000853
854 Returns:
855 None.
856 """
857 raise NotImplementedError()
858
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000859 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800860 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000861
862 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800863 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000864
865 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800866 A dict missing Item -> opaque push state object to be passed to 'push'.
867 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000868 """
869 raise NotImplementedError()
870
871
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800872class _IsolateServerPushState(object):
873 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500874
875 Note this needs to be a global class to support pickling.
876 """
877
878 def __init__(self, upload_url, finalize_url):
879 self.upload_url = upload_url
880 self.finalize_url = finalize_url
881 self.uploaded = False
882 self.finalized = False
883
884
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000885class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000886 """StorageApi implementation that downloads and uploads to Isolate Server.
887
888 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800889 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000890 """
891
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000892 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000893 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000894 assert base_url.startswith('http'), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700895 self._base_url = base_url.rstrip('/')
896 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000897 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000898 self._server_caps = None
899
900 @staticmethod
901 def _generate_handshake_request():
902 """Returns a dict to be sent as handshake request body."""
903 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
904 return {
905 'client_app_version': __version__,
906 'fetcher': True,
907 'protocol_version': ISOLATE_PROTOCOL_VERSION,
908 'pusher': True,
909 }
910
911 @staticmethod
912 def _validate_handshake_response(caps):
913 """Validates and normalizes handshake response."""
914 logging.info('Protocol version: %s', caps['protocol_version'])
915 logging.info('Server version: %s', caps['server_app_version'])
916 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400917 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000918 if not caps['access_token']:
919 raise ValueError('access_token is missing')
920 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000921
922 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000923 def _server_capabilities(self):
924 """Performs handshake with the server if not yet done.
925
926 Returns:
927 Server capabilities dictionary as returned by /handshake endpoint.
928
929 Raises:
930 MappingError if server rejects the handshake.
931 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000932 # TODO(maruel): Make this request much earlier asynchronously while the
933 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800934
935 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
936 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000937 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000938 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000939 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400940 caps = net.url_read_json(
941 url=self._base_url + '/content-gs/handshake',
942 data=self._generate_handshake_request())
943 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400944 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000945 if not isinstance(caps, dict):
946 raise ValueError('Expecting JSON dict')
947 self._server_caps = self._validate_handshake_response(caps)
948 except (ValueError, KeyError, TypeError) as exc:
949 # KeyError exception has very confusing str conversion: it's just a
950 # missing key value and nothing else. So print exception class name
951 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400952 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400953 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000954 exc.__class__.__name__, exc))
955 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000956
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700957 @property
958 def location(self):
959 return self._base_url
960
961 @property
962 def namespace(self):
963 return self._namespace
964
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000965 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000966 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000967 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700968 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000969
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800970 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000971 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800972 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000973
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000974 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800975 source_url,
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800976 read_timeout=DOWNLOAD_READ_TIMEOUT,
977 headers={'Range': 'bytes=%d-' % offset} if offset else None)
978
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000979 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -0800980 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800981
982 # If |offset| is used, verify server respects it by checking Content-Range.
983 if offset:
984 content_range = connection.get_header('Content-Range')
985 if not content_range:
986 raise IOError('Missing Content-Range header')
987
988 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
989 # According to a spec, <size> can be '*' meaning "Total size of the file
990 # is not known in advance".
991 try:
992 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
993 if not match:
994 raise ValueError()
995 content_offset = int(match.group(1))
996 last_byte_index = int(match.group(2))
997 size = None if match.group(3) == '*' else int(match.group(3))
998 except ValueError:
999 raise IOError('Invalid Content-Range header: %s' % content_range)
1000
1001 # Ensure returned offset equals requested one.
1002 if offset != content_offset:
1003 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1004 offset, content_offset, content_range))
1005
1006 # Ensure entire tail of the file is returned.
1007 if size is not None and last_byte_index + 1 != size:
1008 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001010 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001011
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001012 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001013 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001014 assert item.digest is not None
1015 assert item.size is not None
1016 assert isinstance(push_state, _IsolateServerPushState)
1017 assert not push_state.finalized
1018
1019 # Default to item.content().
1020 content = item.content() if content is None else content
1021
1022 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1023 if isinstance(content, basestring):
1024 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1025 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001026
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001027 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1028 # If |content| is indeed a generator, it can not be re-winded back
1029 # to the beginning of the stream. A retry will find it exhausted. A possible
1030 # solution is to wrap |content| generator with some sort of caching
1031 # restartable generator. It should be done alongside streaming support
1032 # implementation.
1033
1034 # This push operation may be a retry after failed finalization call below,
1035 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001036 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001037 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1038 # upload support is implemented.
1039 if isinstance(content, list) and len(content) == 1:
1040 content = content[0]
1041 else:
1042 content = ''.join(content)
1043 # PUT file to |upload_url|.
1044 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001045 url=push_state.upload_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001046 data=content,
1047 content_type='application/octet-stream',
1048 method='PUT')
1049 if response is None:
1050 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001051 item.digest, push_state.upload_url))
1052 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001053 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001054 logging.info(
1055 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001056
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001057 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001058 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001059 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1060 # send it to isolated server. That way isolate server can verify that
1061 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1062 # stored files).
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001063 # TODO(maruel): Fix the server to accept propery data={} so
1064 # url_read_json() can be used.
1065 response = net.url_read(
1066 url=push_state.finalize_url,
1067 data='',
1068 content_type='application/json',
1069 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001070 if response is None:
1071 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001072 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001073
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001074 def contains(self, items):
1075 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001076
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001077 # Ensure all items were initialized with 'prepare' call. Storage does that.
1078 assert all(i.digest is not None and i.size is not None for i in items)
1079
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001080 # Request body is a json encoded list of dicts.
1081 body = [
1082 {
1083 'h': item.digest,
1084 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001085 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001086 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001087 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001088
1089 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001090 self._base_url,
1091 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001092 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001093
1094 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001095 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001096 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001097 response = net.url_read_json(url=query_url, data=body)
1098 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001099 raise isolated_format.MappingError(
1100 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001101 if not isinstance(response, list):
1102 raise ValueError('Expecting response with json-encoded list')
1103 if len(response) != len(items):
1104 raise ValueError(
1105 'Incorrect number of items in the list, expected %d, '
1106 'but got %d' % (len(items), len(response)))
1107 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001108 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001109 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001110
1111 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001112 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001113 for i, push_urls in enumerate(response):
1114 if push_urls:
1115 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001116 missing_items[items[i]] = _IsolateServerPushState(
1117 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001118 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001119 len(items), len(items) - len(missing_items))
1120 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001121
1122
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001123class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001124 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001125
1126 The common use case is a NFS/CIFS file server that is mounted locally that is
1127 used to fetch the file on a local partition.
1128 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001129
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001130 # Used for push_state instead of None. That way caller is forced to
1131 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1132 _DUMMY_PUSH_STATE = object()
1133
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001134 def __init__(self, base_path, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001135 super(FileSystem, self).__init__()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001136 self._base_path = base_path
1137 self._namespace = namespace
1138
1139 @property
1140 def location(self):
1141 return self._base_path
1142
1143 @property
1144 def namespace(self):
1145 return self._namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001146
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001147 def get_fetch_url(self, digest):
1148 return None
1149
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001150 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001151 assert isinstance(digest, basestring)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001152 return file_read(os.path.join(self._base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001153
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001154 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001155 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001156 assert item.digest is not None
1157 assert item.size is not None
1158 assert push_state is self._DUMMY_PUSH_STATE
1159 content = item.content() if content is None else content
1160 if isinstance(content, basestring):
1161 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1162 content = [content]
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001163 file_write(os.path.join(self._base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001164
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001165 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001166 assert all(i.digest is not None and i.size is not None for i in items)
1167 return dict(
1168 (item, self._DUMMY_PUSH_STATE) for item in items
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001169 if not os.path.exists(os.path.join(self._base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001170 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001171
1172
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001173class LocalCache(object):
1174 """Local cache that stores objects fetched via Storage.
1175
1176 It can be accessed concurrently from multiple threads, so it should protect
1177 its internal state with some lock.
1178 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001179 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001180
1181 def __enter__(self):
1182 """Context manager interface."""
1183 return self
1184
1185 def __exit__(self, _exc_type, _exec_value, _traceback):
1186 """Context manager interface."""
1187 return False
1188
1189 def cached_set(self):
1190 """Returns a set of all cached digests (always a new object)."""
1191 raise NotImplementedError()
1192
1193 def touch(self, digest, size):
1194 """Ensures item is not corrupted and updates its LRU position.
1195
1196 Arguments:
1197 digest: hash digest of item to check.
1198 size: expected size of this item.
1199
1200 Returns:
1201 True if item is in cache and not corrupted.
1202 """
1203 raise NotImplementedError()
1204
1205 def evict(self, digest):
1206 """Removes item from cache if it's there."""
1207 raise NotImplementedError()
1208
1209 def read(self, digest):
1210 """Returns contents of the cached item as a single str."""
1211 raise NotImplementedError()
1212
1213 def write(self, digest, content):
1214 """Reads data from |content| generator and stores it in cache."""
1215 raise NotImplementedError()
1216
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001217 def hardlink(self, digest, dest, file_mode):
1218 """Ensures file at |dest| has same content as cached |digest|.
1219
1220 If file_mode is provided, it is used to set the executable bit if
1221 applicable.
1222 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001223 raise NotImplementedError()
1224
1225
1226class MemoryCache(LocalCache):
1227 """LocalCache implementation that stores everything in memory."""
1228
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001229 def __init__(self, file_mode_mask=0500):
1230 """Args:
1231 file_mode_mask: bit mask to AND file mode with. Default value will make
1232 all mapped files to be read only.
1233 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001234 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001235 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001236 # Let's not assume dict is thread safe.
1237 self._lock = threading.Lock()
1238 self._contents = {}
1239
1240 def cached_set(self):
1241 with self._lock:
1242 return set(self._contents)
1243
1244 def touch(self, digest, size):
1245 with self._lock:
1246 return digest in self._contents
1247
1248 def evict(self, digest):
1249 with self._lock:
1250 self._contents.pop(digest, None)
1251
1252 def read(self, digest):
1253 with self._lock:
1254 return self._contents[digest]
1255
1256 def write(self, digest, content):
1257 # Assemble whole stream before taking the lock.
1258 data = ''.join(content)
1259 with self._lock:
1260 self._contents[digest] = data
1261
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001262 def hardlink(self, digest, dest, file_mode):
1263 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001264 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001265 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001266 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001267
1268
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001269def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001270 """Returns an object that implements low-level StorageApi interface.
1271
1272 It is used by Storage to work with single isolate |namespace|. It should
1273 rarely be used directly by clients, see 'get_storage' for
1274 a better alternative.
1275
1276 Arguments:
1277 file_or_url: a file path to use file system based storage, or URL of isolate
1278 service to use shared cloud based storage.
1279 namespace: isolate namespace to operate in, also defines hashing and
1280 compression scheme used, i.e. namespace names that end with '-gzip'
1281 store compressed data.
1282
1283 Returns:
1284 Instance of StorageApi subclass.
1285 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001286 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001287 return IsolateServer(file_or_url, namespace)
1288 else:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001289 return FileSystem(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001290
1291
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001292def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001293 """Returns Storage class that can upload and download from |namespace|.
1294
1295 Arguments:
1296 file_or_url: a file path to use file system based storage, or URL of isolate
1297 service to use shared cloud based storage.
1298 namespace: isolate namespace to operate in, also defines hashing and
1299 compression scheme used, i.e. namespace names that end with '-gzip'
1300 store compressed data.
1301
1302 Returns:
1303 Instance of Storage.
1304 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001305 return Storage(get_storage_api(file_or_url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001306
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001307
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001308def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001309 """Uploads the given tree to the given url.
1310
1311 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001312 base_url: The base url, it is assume that |base_url|/has/ can be used to
1313 query if an element was already uploaded, and |base_url|/store/
1314 can be used to upload a new element.
1315 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001316 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001317 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001318 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001319 logging.info('upload_tree(indir=%s, files=%d)', indir, len(infiles))
1320
1321 # Convert |indir| + |infiles| into a list of FileItem objects.
1322 # Filter out symlinks, since they are not represented by items on isolate
1323 # server side.
1324 items = [
1325 FileItem(
1326 path=os.path.join(indir, filepath),
1327 digest=metadata['h'],
1328 size=metadata['s'],
1329 high_priority=metadata.get('priority') == '0')
1330 for filepath, metadata in infiles.iteritems()
1331 if 'l' not in metadata
1332 ]
1333
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001334 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001335 storage.upload_items(items)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001336 return 0
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001337def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001338 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001339
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001340 Arguments:
1341 isolated_hash: hash of the root *.isolated file.
1342 storage: Storage class that communicates with isolate storage.
1343 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001344 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001345 require_command: Ensure *.isolated specifies a command to run.
1346
1347 Returns:
1348 Settings object that holds details about loaded *.isolated file.
1349 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001350 logging.debug(
1351 'fetch_isolated(%s, %s, %s, %s, %s)',
1352 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001353 # Hash algorithm to use, defined by namespace |storage| is using.
1354 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001355 with cache:
1356 fetch_queue = FetchQueue(storage, cache)
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001357 settings = isolated_format.Settings()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001358
1359 with tools.Profiler('GetIsolateds'):
1360 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001361 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001362 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1363 try:
1364 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1365 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001366 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001367 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1368 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001369
1370 # Load all *.isolated and start loading rest of the files.
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001371 settings.load(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001372 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001373 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1374 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001375 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001376
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001377 with tools.Profiler('GetRest'):
1378 # Create file system hierarchy.
1379 if not os.path.isdir(outdir):
1380 os.makedirs(outdir)
1381 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001382 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001383
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001384 # Ensure working directory exists.
1385 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1386 if not os.path.isdir(cwd):
1387 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001388
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001389 # Multimap: digest -> list of pairs (path, props).
1390 remaining = {}
1391 for filepath, props in settings.files.iteritems():
1392 if 'h' in props:
1393 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001394
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001395 # Now block on the remaining files to be downloaded and mapped.
1396 logging.info('Retrieving remaining files (%d of them)...',
1397 fetch_queue.pending_count)
1398 last_update = time.time()
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001399 with threading_utils.DeadlockDetector(
1400 isolated_format.DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001401 while remaining:
1402 detector.ping()
1403
1404 # Wait for any item to finish fetching to cache.
1405 digest = fetch_queue.wait(remaining)
1406
1407 # Link corresponding files to a fetched item in cache.
1408 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001409 cache.hardlink(
1410 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001411
1412 # Report progress.
1413 duration = time.time() - last_update
1414 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1415 msg = '%d files remaining...' % len(remaining)
1416 print msg
1417 logging.info(msg)
1418 last_update = time.time()
1419
1420 # Cache could evict some items we just tried to fetch, it's a fatal error.
1421 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001422 raise isolated_format.MappingError(
1423 'Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001424 return settings
1425
1426
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001427def directory_to_metadata(root, algo, blacklist):
1428 """Returns the FileItem list and .isolated metadata for a directory."""
1429 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001430 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001431 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001432 metadata = {
1433 relpath: isolated_format.file_to_metadata(
1434 os.path.join(root, relpath), {}, False, algo)
1435 for relpath in paths
1436 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001437 for v in metadata.itervalues():
1438 v.pop('t')
1439 items = [
1440 FileItem(
1441 path=os.path.join(root, relpath),
1442 digest=meta['h'],
1443 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001444 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001445 for relpath, meta in metadata.iteritems() if 'h' in meta
1446 ]
1447 return items, metadata
1448
1449
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001450def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001451 """Stores every entries and returns the relevant data.
1452
1453 Arguments:
1454 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001455 files: list of file paths to upload. If a directory is specified, a
1456 .isolated file is created and its hash is returned.
1457 blacklist: function that returns True if a file should be omitted.
1458 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001459 assert all(isinstance(i, unicode) for i in files), files
1460 if len(files) != len(set(map(os.path.abspath, files))):
1461 raise Error('Duplicate entries found.')
1462
1463 results = []
1464 # The temporary directory is only created as needed.
1465 tempdir = None
1466 try:
1467 # TODO(maruel): Yield the files to a worker thread.
1468 items_to_upload = []
1469 for f in files:
1470 try:
1471 filepath = os.path.abspath(f)
1472 if os.path.isdir(filepath):
1473 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001474 items, metadata = directory_to_metadata(
1475 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001476
1477 # Create the .isolated file.
1478 if not tempdir:
1479 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1480 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1481 os.close(handle)
1482 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001483 'algo':
1484 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001485 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001486 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001487 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001488 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001489 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001490 items_to_upload.extend(items)
1491 items_to_upload.append(
1492 FileItem(
1493 path=isolated,
1494 digest=h,
1495 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001496 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001497 results.append((h, f))
1498
1499 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001500 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001501 items_to_upload.append(
1502 FileItem(
1503 path=filepath,
1504 digest=h,
1505 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001506 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001507 results.append((h, f))
1508 else:
1509 raise Error('%s is neither a file or directory.' % f)
1510 except OSError:
1511 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001512 # Technically we would care about which files were uploaded but we don't
1513 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001514 _uploaded_files = storage.upload_items(items_to_upload)
1515 return results
1516 finally:
1517 if tempdir:
1518 shutil.rmtree(tempdir)
1519
1520
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001521def archive(out, namespace, files, blacklist):
1522 if files == ['-']:
1523 files = sys.stdin.readlines()
1524
1525 if not files:
1526 raise Error('Nothing to upload')
1527
1528 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001529 blacklist = tools.gen_blacklist(blacklist)
1530 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001531 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001532 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1533
1534
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001535@subcommand.usage('<file1..fileN> or - to read from stdin')
1536def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001537 """Archives data to the server.
1538
1539 If a directory is specified, a .isolated file is created the whole directory
1540 is uploaded. Then this .isolated file can be included in another one to run
1541 commands.
1542
1543 The commands output each file that was processed with its content hash. For
1544 directories, the .isolated generated for the directory is listed as the
1545 directory entry itself.
1546 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001547 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001548 parser.add_option(
1549 '--blacklist',
1550 action='append', default=list(DEFAULT_BLACKLIST),
1551 help='List of regexp to use as blacklist filter when uploading '
1552 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001553 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001554 process_isolate_server_options(parser, options)
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001555 if file_path.is_url(options.isolate_server):
1556 auth.ensure_logged_in(options.isolate_server)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001557 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001558 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001559 except Error as e:
1560 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001561 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001562
1563
1564def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001565 """Download data from the server.
1566
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001567 It can either download individual files or a complete tree from a .isolated
1568 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001569 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001570 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001571 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001572 '-i', '--isolated', metavar='HASH',
1573 help='hash of an isolated file, .isolated file content is discarded, use '
1574 '--file if you need it')
1575 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001576 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1577 help='hash and destination of a file, can be used multiple times')
1578 parser.add_option(
1579 '-t', '--target', metavar='DIR', default=os.getcwd(),
1580 help='destination directory')
1581 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001582 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001583 if args:
1584 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001585 if bool(options.isolated) == bool(options.file):
1586 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001587
1588 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001589
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001590 remote = options.isolate_server or options.indir
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001591 if file_path.is_url(remote):
1592 auth.ensure_logged_in(remote)
1593
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001594 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001595 # Fetching individual files.
1596 if options.file:
1597 channel = threading_utils.TaskChannel()
1598 pending = {}
1599 for digest, dest in options.file:
1600 pending[digest] = dest
1601 storage.async_fetch(
1602 channel,
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001603 isolated_format.WorkerPool.MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001604 digest,
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -04001605 isolated_format.UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001606 functools.partial(file_write, os.path.join(options.target, dest)))
1607 while pending:
1608 fetched = channel.pull()
1609 dest = pending.pop(fetched)
1610 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001611
Vadim Shtayura3172be52013-12-03 12:49:05 -08001612 # Fetching whole isolated tree.
1613 if options.isolated:
1614 settings = fetch_isolated(
1615 isolated_hash=options.isolated,
1616 storage=storage,
1617 cache=MemoryCache(),
Vadim Shtayura3172be52013-12-03 12:49:05 -08001618 outdir=options.target,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001619 require_command=False)
1620 rel = os.path.join(options.target, settings.relative_cwd)
1621 print('To run this test please run from the directory %s:' %
1622 os.path.join(options.target, rel))
1623 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001624
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001625 return 0
1626
1627
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001628@subcommand.usage('<file1..fileN> or - to read from stdin')
1629def CMDhashtable(parser, args):
1630 """Archives data to a hashtable on the file system.
1631
1632 If a directory is specified, a .isolated file is created the whole directory
1633 is uploaded. Then this .isolated file can be included in another one to run
1634 commands.
1635
1636 The commands output each file that was processed with its content hash. For
1637 directories, the .isolated generated for the directory is listed as the
1638 directory entry itself.
1639 """
1640 add_outdir_options(parser)
1641 parser.add_option(
1642 '--blacklist',
1643 action='append', default=list(DEFAULT_BLACKLIST),
1644 help='List of regexp to use as blacklist filter when uploading '
1645 'directories')
1646 options, files = parser.parse_args(args)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001647 process_outdir_options(parser, options, os.getcwd())
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001648 try:
1649 # Do not compress files when archiving to the file system.
1650 archive(options.outdir, 'default', files, options.blacklist)
1651 except Error as e:
1652 parser.error(e.args[0])
1653 return 0
1654
1655
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001656def add_isolate_server_options(parser, add_indir):
1657 """Adds --isolate-server and --namespace options to parser.
1658
1659 Includes --indir if desired.
1660 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001661 parser.add_option(
1662 '-I', '--isolate-server',
1663 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001664 help='URL of the Isolate Server to use. Defaults to the environment '
1665 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1666 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001667 parser.add_option(
1668 '--namespace', default='default-gzip',
1669 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001670 if add_indir:
1671 parser.add_option(
1672 '--indir', metavar='DIR',
1673 help='Directory used to store the hashtable instead of using an '
1674 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001675
1676
1677def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001678 """Processes the --isolate-server and --indir options and aborts if neither is
1679 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001680 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001681 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001682 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001683 if not has_indir:
1684 parser.error('--isolate-server is required.')
1685 elif not options.indir:
1686 parser.error('Use one of --indir or --isolate-server.')
1687 else:
1688 if has_indir and options.indir:
1689 parser.error('Use only one of --indir or --isolate-server.')
1690
1691 if options.isolate_server:
1692 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001693 if parts.query:
1694 parser.error('--isolate-server doesn\'t support query parameter.')
1695 if parts.fragment:
1696 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001697 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
1698 # what is desired here.
1699 new = list(parts)
1700 if not new[1] and new[2]:
1701 new[1] = new[2].rstrip('/')
1702 new[2] = ''
1703 new[2] = new[2].rstrip('/')
1704 options.isolate_server = urlparse.urlunparse(new)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001705 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001706 return
1707
1708 if file_path.is_url(options.indir):
1709 parser.error('Can\'t use an URL for --indir.')
1710 options.indir = unicode(options.indir).replace('/', os.path.sep)
1711 options.indir = os.path.abspath(
1712 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
1713 if not os.path.isdir(options.indir):
1714 parser.error('Path given to --indir must exist.')
1715
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001716
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001717def add_outdir_options(parser):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001718 """Adds --outdir, which is orthogonal to --isolate-server.
1719
1720 Note: On upload, separate commands are used between 'archive' and 'hashtable'.
1721 On 'download', the same command can download from either an isolate server or
1722 a file system.
1723 """
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001724 parser.add_option(
1725 '-o', '--outdir', metavar='DIR',
1726 help='Directory used to recreate the tree.')
1727
1728
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001729def process_outdir_options(parser, options, cwd):
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001730 if not options.outdir:
1731 parser.error('--outdir is required.')
1732 if file_path.is_url(options.outdir):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001733 parser.error('Can\'t use an URL for --outdir.')
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001734 options.outdir = unicode(options.outdir).replace('/', os.path.sep)
1735 # outdir doesn't need native path case since tracing is never done from there.
1736 options.outdir = os.path.abspath(
1737 os.path.normpath(os.path.join(cwd, options.outdir)))
1738 # In theory, we'd create the directory outdir right away. Defer doing it in
1739 # case there's errors in the command line.
1740
1741
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001742class OptionParserIsolateServer(tools.OptionParserWithLogging):
1743 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001744 tools.OptionParserWithLogging.__init__(
1745 self,
1746 version=__version__,
1747 prog=os.path.basename(sys.modules[__name__].__file__),
1748 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001749 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001750
1751 def parse_args(self, *args, **kwargs):
1752 options, args = tools.OptionParserWithLogging.parse_args(
1753 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001754 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001755 return options, args
1756
1757
1758def main(args):
1759 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001760 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001761
1762
1763if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001764 fix_encoding.fix_encoding()
1765 tools.disable_buffering()
1766 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001767 sys.exit(main(sys.argv[1:]))