aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 | # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 | # that can be found in the LICENSE file. |
| 5 | |
| 6 | """A low-level blob storage/retrieval interface to the Isolate server""" |
| 7 | |
| 8 | import base64 |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 9 | import collections |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 10 | import logging |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 11 | import os |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 12 | import re |
| 13 | import sys |
| 14 | import threading |
| 15 | import time |
| 16 | import types |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 17 | import uuid |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 18 | |
| 19 | from utils import file_path |
| 20 | from utils import net |
| 21 | |
| 22 | import isolated_format |
| 23 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 24 | try: |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame] | 25 | import grpc # for error codes |
| 26 | from utils import grpc_proxy |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 27 | from proto import bytestream_pb2 |
Marc-Antoine Ruel | 85476a0 | 2017-09-20 21:08:51 -0400 | [diff] [blame] | 28 | # If not present, grpc crashes later. |
| 29 | import pyasn1_modules |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 30 | except ImportError as err: |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 31 | grpc = None |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame] | 32 | grpc_proxy = None |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 33 | bytestream_pb2 = None |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 34 | |
| 35 | |
| 36 | # Chunk size to use when reading from network stream. |
| 37 | NET_IO_FILE_CHUNK = 16 * 1024 |
| 38 | |
| 39 | |
| 40 | # Read timeout in seconds for downloads from isolate storage. If there's no |
| 41 | # response from the server within this timeout whole download will be aborted. |
| 42 | DOWNLOAD_READ_TIMEOUT = 60 |
| 43 | |
| 44 | |
aludwin | 758d246 | 2017-06-29 16:38:50 -0700 | [diff] [blame] | 45 | # Stores the gRPC proxy address. Must be set if the storage API class is |
| 46 | # IsolateServerGrpc (call 'set_grpc_proxy'). |
| 47 | _grpc_proxy = None |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 48 | |
| 49 | |
| 50 | class Item(object): |
| 51 | """An item to push to Storage. |
| 52 | |
| 53 | Its digest and size may be provided in advance, if known. Otherwise they will |
| 54 | be derived from content(). If digest is provided, it MUST correspond to |
| 55 | hash algorithm used by Storage. |
| 56 | |
| 57 | When used with Storage, Item starts its life in a main thread, travels |
| 58 | to 'contains' thread, then to 'push' thread and then finally back to |
| 59 | the main thread. It is never used concurrently from multiple threads. |
| 60 | """ |
| 61 | |
| 62 | def __init__(self, digest=None, size=None, high_priority=False): |
| 63 | self.digest = digest |
| 64 | self.size = size |
| 65 | self.high_priority = high_priority |
| 66 | self.compression_level = 6 |
| 67 | |
| 68 | def content(self): |
| 69 | """Iterable with content of this item as byte string (str) chunks.""" |
| 70 | raise NotImplementedError() |
| 71 | |
| 72 | def prepare(self, hash_algo): |
| 73 | """Ensures self.digest and self.size are set. |
| 74 | |
| 75 | Uses content() as a source of data to calculate them. Does nothing if digest |
| 76 | and size is already known. |
| 77 | |
| 78 | Arguments: |
| 79 | hash_algo: hash algorithm to use to calculate digest. |
| 80 | """ |
| 81 | if self.digest is None or self.size is None: |
| 82 | digest = hash_algo() |
| 83 | total = 0 |
| 84 | for chunk in self.content(): |
| 85 | digest.update(chunk) |
| 86 | total += len(chunk) |
| 87 | self.digest = digest.hexdigest() |
| 88 | self.size = total |
| 89 | |
| 90 | |
| 91 | class StorageApi(object): |
| 92 | """Interface for classes that implement low-level storage operations. |
| 93 | |
| 94 | StorageApi is oblivious of compression and hashing scheme used. This details |
| 95 | are handled in higher level Storage class. |
| 96 | |
| 97 | Clients should generally not use StorageApi directly. Storage class is |
| 98 | preferred since it implements compression and upload optimizations. |
| 99 | """ |
| 100 | |
| 101 | @property |
| 102 | def location(self): |
| 103 | """URL of the backing store that this class is using.""" |
| 104 | raise NotImplementedError() |
| 105 | |
| 106 | @property |
| 107 | def namespace(self): |
| 108 | """Isolate namespace used by this storage. |
| 109 | |
| 110 | Indirectly defines hashing scheme and compression method used. |
| 111 | """ |
| 112 | raise NotImplementedError() |
| 113 | |
aludwin | ef01fa3 | 2017-06-29 12:01:03 -0700 | [diff] [blame] | 114 | @property |
| 115 | def internal_compression(self): |
| 116 | """True if this class doesn't require external compression. |
| 117 | |
| 118 | If true, callers should not compress items, even if the namespace indicates |
| 119 | otherwise. Compression will be performed by the StorageApi class. |
| 120 | """ |
| 121 | return False |
| 122 | |
Adrian Ludwin | b5b0531 | 2017-09-13 07:46:24 -0400 | [diff] [blame] | 123 | def fetch(self, digest, size, offset): |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 124 | """Fetches an object and yields its content. |
| 125 | |
| 126 | Arguments: |
| 127 | digest: hash digest of item to download. |
Adrian Ludwin | b5b0531 | 2017-09-13 07:46:24 -0400 | [diff] [blame] | 128 | size: size of the item to download if known, or None otherwise. |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 129 | offset: offset (in bytes) from the start of the file to resume fetch from. |
| 130 | |
| 131 | Yields: |
| 132 | Chunks of downloaded item (as str objects). |
| 133 | """ |
| 134 | raise NotImplementedError() |
| 135 | |
| 136 | def push(self, item, push_state, content=None): |
| 137 | """Uploads an |item| with content generated by |content| generator. |
| 138 | |
| 139 | |item| MUST go through 'contains' call to get |push_state| before it can |
| 140 | be pushed to the storage. |
| 141 | |
| 142 | To be clear, here is one possible usage: |
| 143 | all_items = [... all items to push as Item subclasses ...] |
| 144 | for missing_item, push_state in storage_api.contains(all_items).items(): |
| 145 | storage_api.push(missing_item, push_state) |
| 146 | |
| 147 | When pushing to a namespace with compression, data that should be pushed |
| 148 | and data provided by the item is not the same. In that case |content| is |
| 149 | not None and it yields chunks of compressed data (using item.content() as |
| 150 | a source of original uncompressed data). This is implemented by Storage |
| 151 | class. |
| 152 | |
| 153 | Arguments: |
| 154 | item: Item object that holds information about an item being pushed. |
| 155 | push_state: push state object as returned by 'contains' call. |
| 156 | content: a generator that yields chunks to push, item.content() if None. |
| 157 | |
| 158 | Returns: |
| 159 | None. |
| 160 | """ |
| 161 | raise NotImplementedError() |
| 162 | |
| 163 | def contains(self, items): |
| 164 | """Checks for |items| on the server, prepares missing ones for upload. |
| 165 | |
| 166 | Arguments: |
| 167 | items: list of Item objects to check for presence. |
| 168 | |
| 169 | Returns: |
| 170 | A dict missing Item -> opaque push state object to be passed to 'push'. |
| 171 | See doc string for 'push'. |
| 172 | """ |
| 173 | raise NotImplementedError() |
| 174 | |
| 175 | |
| 176 | class _IsolateServerPushState(object): |
| 177 | """Per-item state passed from IsolateServer.contains to IsolateServer.push. |
| 178 | |
| 179 | Note this needs to be a global class to support pickling. |
| 180 | """ |
| 181 | |
| 182 | def __init__(self, preupload_status, size): |
| 183 | self.preupload_status = preupload_status |
| 184 | gs_upload_url = preupload_status.get('gs_upload_url') or None |
| 185 | if gs_upload_url: |
| 186 | self.upload_url = gs_upload_url |
| 187 | self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload' |
| 188 | else: |
| 189 | self.upload_url = 'api/isolateservice/v1/store_inline' |
| 190 | self.finalize_url = None |
| 191 | self.uploaded = False |
| 192 | self.finalized = False |
| 193 | self.size = size |
| 194 | |
| 195 | |
| 196 | def guard_memory_use(server, content, size): |
| 197 | """Guards a server against using excessive memory while uploading. |
| 198 | |
| 199 | The server needs to contain a _memory_use int and a _lock mutex |
| 200 | (both IsolateServer and IsolateServerGrpc qualify); this function |
| 201 | then uses those values to track memory usage in a thread-safe way. |
| 202 | |
| 203 | If a request would cause the memory usage to exceed a safe maximum, |
| 204 | this function sleeps in 0.1s increments until memory usage falls |
| 205 | below the maximum. |
| 206 | """ |
| 207 | if isinstance(content, (basestring, list)): |
| 208 | # Memory is already used, too late. |
| 209 | with server._lock: |
| 210 | server._memory_use += size |
| 211 | else: |
| 212 | # TODO(vadimsh): Do not read from |content| generator when retrying push. |
| 213 | # If |content| is indeed a generator, it can not be re-winded back to the |
| 214 | # beginning of the stream. A retry will find it exhausted. A possible |
| 215 | # solution is to wrap |content| generator with some sort of caching |
| 216 | # restartable generator. It should be done alongside streaming support |
| 217 | # implementation. |
| 218 | # |
| 219 | # In theory, we should keep the generator, so that it is not serialized in |
| 220 | # memory. Sadly net.HttpService.request() requires the body to be |
| 221 | # serialized. |
| 222 | assert isinstance(content, types.GeneratorType), repr(content) |
| 223 | slept = False |
| 224 | # HACK HACK HACK. Please forgive me for my sins but OMG, it works! |
| 225 | # One byte less than 512mb. This is to cope with incompressible content. |
| 226 | max_size = int(sys.maxsize * 0.25) |
| 227 | while True: |
| 228 | with server._lock: |
| 229 | # This is due to 32 bits python when uploading very large files. The |
| 230 | # problem is that it's comparing uncompressed sizes, while we care |
| 231 | # about compressed sizes since it's what is serialized in memory. |
| 232 | # The first check assumes large files are compressible and that by |
| 233 | # throttling one upload at once, we can survive. Otherwise, kaboom. |
| 234 | memory_use = server._memory_use |
| 235 | if ((size >= max_size and not memory_use) or |
| 236 | (memory_use + size <= max_size)): |
| 237 | server._memory_use += size |
| 238 | memory_use = server._memory_use |
| 239 | break |
| 240 | time.sleep(0.1) |
| 241 | slept = True |
| 242 | if slept: |
| 243 | logging.info('Unblocked: %d %d', memory_use, size) |
| 244 | |
| 245 | |
| 246 | class IsolateServer(StorageApi): |
| 247 | """StorageApi implementation that downloads and uploads to Isolate Server. |
| 248 | |
| 249 | It uploads and downloads directly from Google Storage whenever appropriate. |
| 250 | Works only within single namespace. |
| 251 | """ |
| 252 | |
| 253 | def __init__(self, base_url, namespace): |
| 254 | super(IsolateServer, self).__init__() |
| 255 | assert file_path.is_url(base_url), base_url |
| 256 | self._base_url = base_url.rstrip('/') |
| 257 | self._namespace = namespace |
Adrian Ludwin | b5b0531 | 2017-09-13 07:46:24 -0400 | [diff] [blame] | 258 | algo = isolated_format.get_hash_algo(namespace) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 259 | self._namespace_dict = { |
| 260 | 'compression': 'flate' if namespace.endswith( |
| 261 | ('-gzip', '-flate')) else '', |
Adrian Ludwin | b5b0531 | 2017-09-13 07:46:24 -0400 | [diff] [blame] | 262 | 'digest_hash': isolated_format.SUPPORTED_ALGOS_REVERSE[algo], |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 263 | 'namespace': namespace, |
| 264 | } |
| 265 | self._lock = threading.Lock() |
| 266 | self._server_caps = None |
| 267 | self._memory_use = 0 |
| 268 | |
| 269 | @property |
| 270 | def _server_capabilities(self): |
| 271 | """Gets server details. |
| 272 | |
| 273 | Returns: |
| 274 | Server capabilities dictionary as returned by /server_details endpoint. |
| 275 | """ |
| 276 | # TODO(maruel): Make this request much earlier asynchronously while the |
| 277 | # files are being enumerated. |
| 278 | |
| 279 | # TODO(vadimsh): Put |namespace| in the URL so that server can apply |
| 280 | # namespace-level ACLs to this call. |
| 281 | |
| 282 | with self._lock: |
| 283 | if self._server_caps is None: |
| 284 | self._server_caps = net.url_read_json( |
| 285 | url='%s/api/isolateservice/v1/server_details' % self._base_url, |
| 286 | data={}) |
| 287 | return self._server_caps |
| 288 | |
| 289 | @property |
| 290 | def location(self): |
| 291 | return self._base_url |
| 292 | |
| 293 | @property |
| 294 | def namespace(self): |
| 295 | return self._namespace |
| 296 | |
Adrian Ludwin | b5b0531 | 2017-09-13 07:46:24 -0400 | [diff] [blame] | 297 | def fetch(self, digest, _size, offset): |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 298 | assert offset >= 0 |
| 299 | source_url = '%s/api/isolateservice/v1/retrieve' % ( |
| 300 | self._base_url) |
| 301 | logging.debug('download_file(%s, %d)', source_url, offset) |
| 302 | response = self._do_fetch(source_url, digest, offset) |
| 303 | |
| 304 | if not response: |
| 305 | raise IOError( |
| 306 | 'Attempted to fetch from %s; no data exist: %s / %s.' % ( |
| 307 | source_url, self._namespace, digest)) |
| 308 | |
| 309 | # for DB uploads |
| 310 | content = response.get('content') |
| 311 | if content is not None: |
| 312 | yield base64.b64decode(content) |
| 313 | return |
| 314 | |
Marc-Antoine Ruel | a0fab7a | 2017-10-30 20:27:30 -0400 | [diff] [blame^] | 315 | if not response.get('url'): |
| 316 | raise IOError( |
| 317 | 'Invalid response while fetching %s: %s' % (digest, response)) |
| 318 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 319 | # for GS entities |
| 320 | connection = net.url_open(response['url']) |
| 321 | if not connection: |
| 322 | raise IOError('Failed to download %s / %s' % (self._namespace, digest)) |
| 323 | |
| 324 | # If |offset|, verify server respects it by checking Content-Range. |
| 325 | if offset: |
| 326 | content_range = connection.get_header('Content-Range') |
| 327 | if not content_range: |
| 328 | raise IOError('Missing Content-Range header') |
| 329 | |
| 330 | # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'. |
| 331 | # According to a spec, <size> can be '*' meaning "Total size of the file |
| 332 | # is not known in advance". |
| 333 | try: |
| 334 | match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range) |
| 335 | if not match: |
| 336 | raise ValueError() |
| 337 | content_offset = int(match.group(1)) |
| 338 | last_byte_index = int(match.group(2)) |
| 339 | size = None if match.group(3) == '*' else int(match.group(3)) |
| 340 | except ValueError: |
| 341 | raise IOError('Invalid Content-Range header: %s' % content_range) |
| 342 | |
| 343 | # Ensure returned offset equals requested one. |
| 344 | if offset != content_offset: |
| 345 | raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % ( |
| 346 | offset, content_offset, content_range)) |
| 347 | |
| 348 | # Ensure entire tail of the file is returned. |
| 349 | if size is not None and last_byte_index + 1 != size: |
| 350 | raise IOError('Incomplete response. Content-Range: %s' % content_range) |
| 351 | |
| 352 | for data in connection.iter_content(NET_IO_FILE_CHUNK): |
| 353 | yield data |
| 354 | |
| 355 | def push(self, item, push_state, content=None): |
| 356 | assert isinstance(item, Item) |
| 357 | assert item.digest is not None |
| 358 | assert item.size is not None |
| 359 | assert isinstance(push_state, _IsolateServerPushState) |
| 360 | assert not push_state.finalized |
| 361 | |
| 362 | # Default to item.content(). |
| 363 | content = item.content() if content is None else content |
| 364 | logging.info('Push state size: %d', push_state.size) |
| 365 | guard_memory_use(self, content, push_state.size) |
| 366 | |
| 367 | try: |
| 368 | # This push operation may be a retry after failed finalization call below, |
| 369 | # no need to reupload contents in that case. |
| 370 | if not push_state.uploaded: |
| 371 | # PUT file to |upload_url|. |
| 372 | success = self._do_push(push_state, content) |
| 373 | if not success: |
| 374 | raise IOError('Failed to upload file with hash %s to URL %s' % ( |
| 375 | item.digest, push_state.upload_url)) |
| 376 | push_state.uploaded = True |
| 377 | else: |
| 378 | logging.info( |
| 379 | 'A file %s already uploaded, retrying finalization only', |
| 380 | item.digest) |
| 381 | |
| 382 | # Optionally notify the server that it's done. |
| 383 | if push_state.finalize_url: |
| 384 | # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and |
| 385 | # send it to isolated server. That way isolate server can verify that |
| 386 | # the data safely reached Google Storage (GS provides MD5 and CRC32C of |
| 387 | # stored files). |
| 388 | # TODO(maruel): Fix the server to accept properly data={} so |
| 389 | # url_read_json() can be used. |
| 390 | response = net.url_read_json( |
| 391 | url='%s/%s' % (self._base_url, push_state.finalize_url), |
| 392 | data={ |
| 393 | 'upload_ticket': push_state.preupload_status['upload_ticket'], |
| 394 | }) |
Marc-Antoine Ruel | 3a314aa | 2017-10-10 17:41:06 -0400 | [diff] [blame] | 395 | if not response or not response.get('ok'): |
| 396 | raise IOError( |
| 397 | 'Failed to finalize file with hash %s\n%r' % |
| 398 | (item.digest, response)) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 399 | push_state.finalized = True |
| 400 | finally: |
| 401 | with self._lock: |
| 402 | self._memory_use -= push_state.size |
| 403 | |
| 404 | def contains(self, items): |
| 405 | # Ensure all items were initialized with 'prepare' call. Storage does that. |
| 406 | assert all(i.digest is not None and i.size is not None for i in items) |
| 407 | |
| 408 | # Request body is a json encoded list of dicts. |
| 409 | body = { |
| 410 | 'items': [ |
| 411 | { |
| 412 | 'digest': item.digest, |
| 413 | 'is_isolated': bool(item.high_priority), |
| 414 | 'size': item.size, |
| 415 | } for item in items |
| 416 | ], |
| 417 | 'namespace': self._namespace_dict, |
| 418 | } |
| 419 | |
| 420 | query_url = '%s/api/isolateservice/v1/preupload' % self._base_url |
| 421 | |
| 422 | # Response body is a list of push_urls (or null if file is already present). |
| 423 | response = None |
| 424 | try: |
| 425 | response = net.url_read_json(url=query_url, data=body) |
| 426 | if response is None: |
| 427 | raise isolated_format.MappingError( |
| 428 | 'Failed to execute preupload query') |
| 429 | except ValueError as err: |
| 430 | raise isolated_format.MappingError( |
| 431 | 'Invalid response from server: %s, body is %s' % (err, response)) |
| 432 | |
| 433 | # Pick Items that are missing, attach _PushState to them. |
| 434 | missing_items = {} |
| 435 | for preupload_status in response.get('items', []): |
| 436 | assert 'upload_ticket' in preupload_status, ( |
| 437 | preupload_status, '/preupload did not generate an upload ticket') |
| 438 | index = int(preupload_status['index']) |
| 439 | missing_items[items[index]] = _IsolateServerPushState( |
| 440 | preupload_status, items[index].size) |
| 441 | logging.info('Queried %d files, %d cache hit', |
| 442 | len(items), len(items) - len(missing_items)) |
| 443 | return missing_items |
| 444 | |
| 445 | def _do_fetch(self, url, digest, offset): |
| 446 | """Fetches isolated data from the URL. |
| 447 | |
| 448 | Used only for fetching files, not for API calls. Can be overridden in |
| 449 | subclasses. |
| 450 | |
| 451 | Args: |
| 452 | url: URL to fetch the data from, can possibly return http redirect. |
| 453 | offset: byte offset inside the file to start fetching from. |
| 454 | |
| 455 | Returns: |
| 456 | net.HttpResponse compatible object, with 'read' and 'get_header' calls. |
| 457 | """ |
| 458 | assert isinstance(offset, int) |
| 459 | data = { |
| 460 | 'digest': digest.encode('utf-8'), |
| 461 | 'namespace': self._namespace_dict, |
| 462 | 'offset': offset, |
| 463 | } |
| 464 | # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint |
| 465 | # is added. |
| 466 | return net.url_read_json( |
| 467 | url=url, |
| 468 | data=data, |
| 469 | read_timeout=DOWNLOAD_READ_TIMEOUT) |
| 470 | |
| 471 | def _do_push(self, push_state, content): |
| 472 | """Uploads isolated file to the URL. |
| 473 | |
| 474 | Used only for storing files, not for API calls. Can be overridden in |
| 475 | subclasses. |
| 476 | |
| 477 | Args: |
| 478 | url: URL to upload the data to. |
| 479 | push_state: an _IsolateServicePushState instance |
| 480 | item: the original Item to be uploaded |
| 481 | content: an iterable that yields 'str' chunks. |
| 482 | """ |
| 483 | # A cheezy way to avoid memcpy of (possibly huge) file, until streaming |
| 484 | # upload support is implemented. |
| 485 | if isinstance(content, list) and len(content) == 1: |
| 486 | content = content[0] |
| 487 | else: |
| 488 | content = ''.join(content) |
| 489 | |
| 490 | # DB upload |
| 491 | if not push_state.finalize_url: |
| 492 | url = '%s/%s' % (self._base_url, push_state.upload_url) |
| 493 | content = base64.b64encode(content) |
| 494 | data = { |
| 495 | 'upload_ticket': push_state.preupload_status['upload_ticket'], |
| 496 | 'content': content, |
| 497 | } |
| 498 | response = net.url_read_json(url=url, data=data) |
| 499 | return response is not None and response['ok'] |
| 500 | |
| 501 | # upload to GS |
| 502 | url = push_state.upload_url |
| 503 | response = net.url_read( |
| 504 | content_type='application/octet-stream', |
| 505 | data=content, |
| 506 | method='PUT', |
| 507 | headers={'Cache-Control': 'public, max-age=31536000'}, |
| 508 | url=url) |
| 509 | return response is not None |
| 510 | |
| 511 | |
| 512 | class _IsolateServerGrpcPushState(object): |
| 513 | """Empty class, just to present same interface as IsolateServer """ |
| 514 | |
| 515 | def __init__(self): |
| 516 | pass |
| 517 | |
| 518 | |
| 519 | class IsolateServerGrpc(StorageApi): |
| 520 | """StorageApi implementation that downloads and uploads to a gRPC service. |
| 521 | |
Adrian Ludwin | b5b0531 | 2017-09-13 07:46:24 -0400 | [diff] [blame] | 522 | Limitations: does not pass on namespace to the server (uses it only for hash |
| 523 | algo and compression), and only allows zero offsets while fetching. |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 524 | """ |
| 525 | |
aludwin | 758d246 | 2017-06-29 16:38:50 -0700 | [diff] [blame] | 526 | def __init__(self, server, namespace, proxy): |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 527 | super(IsolateServerGrpc, self).__init__() |
Adrian Ludwin | b5b0531 | 2017-09-13 07:46:24 -0400 | [diff] [blame] | 528 | logging.info('Using gRPC for Isolate with server %s, ' |
| 529 | 'namespace %s, proxy %s', |
| 530 | server, namespace, proxy) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 531 | self._server = server |
| 532 | self._lock = threading.Lock() |
| 533 | self._memory_use = 0 |
| 534 | self._num_pushes = 0 |
| 535 | self._already_exists = 0 |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame] | 536 | self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub) |
aludwin | ef01fa3 | 2017-06-29 12:01:03 -0700 | [diff] [blame] | 537 | self._namespace = namespace |
| 538 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 539 | |
| 540 | @property |
| 541 | def location(self): |
| 542 | return self._server |
| 543 | |
| 544 | @property |
| 545 | def namespace(self): |
aludwin | ef01fa3 | 2017-06-29 12:01:03 -0700 | [diff] [blame] | 546 | return self._namespace |
| 547 | |
| 548 | @property |
| 549 | def internal_compression(self): |
| 550 | # gRPC natively compresses all messages before transmission. |
| 551 | return True |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 552 | |
Adrian Ludwin | b5b0531 | 2017-09-13 07:46:24 -0400 | [diff] [blame] | 553 | def fetch(self, digest, size, offset): |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 554 | # The gRPC APIs only work with an offset of 0 |
| 555 | assert offset == 0 |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 556 | request = bytestream_pb2.ReadRequest() |
Adrian Ludwin | b5b0531 | 2017-09-13 07:46:24 -0400 | [diff] [blame] | 557 | if not size: |
| 558 | size = -1 |
| 559 | request.resource_name = '%s/blobs/%s/%d' % ( |
| 560 | self._proxy.prefix, digest, size) |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 561 | try: |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame] | 562 | for response in self._proxy.get_stream('Read', request): |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 563 | yield response.data |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 564 | except grpc.RpcError as g: |
| 565 | logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
| 566 | raise IOError(g) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 567 | |
| 568 | def push(self, item, push_state, content=None): |
| 569 | assert isinstance(item, Item) |
| 570 | assert item.digest is not None |
| 571 | assert item.size is not None |
| 572 | assert isinstance(push_state, _IsolateServerGrpcPushState) |
| 573 | |
| 574 | # Default to item.content(). |
| 575 | content = item.content() if content is None else content |
| 576 | guard_memory_use(self, content, item.size) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 577 | self._num_pushes += 1 |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 578 | |
| 579 | try: |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 580 | def chunker(): |
| 581 | # Returns one bit of content at a time |
aludwin | d1e5913 | 2016-12-14 08:22:41 -0800 | [diff] [blame] | 582 | if (isinstance(content, str) |
| 583 | or not isinstance(content, collections.Iterable)): |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 584 | yield content |
| 585 | else: |
| 586 | for chunk in content: |
| 587 | yield chunk |
| 588 | def slicer(): |
| 589 | # Ensures every bit of content is under the gRPC max size; yields |
| 590 | # proto messages to send via gRPC. |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 591 | request = bytestream_pb2.WriteRequest() |
| 592 | u = uuid.uuid4() |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame] | 593 | request.resource_name = '%s/uploads/%s/blobs/%s/%d' % ( |
| 594 | self._proxy.prefix, u, item.digest, item.size) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 595 | request.write_offset = 0 |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 596 | for chunk in chunker(): |
| 597 | # Make sure we send at least one chunk for zero-length blobs |
| 598 | has_sent_anything = False |
aludwin | d1e5913 | 2016-12-14 08:22:41 -0800 | [diff] [blame] | 599 | while chunk or not has_sent_anything: |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 600 | has_sent_anything = True |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 601 | slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
| 602 | request.data = chunk[:slice_len] |
| 603 | if request.write_offset + slice_len == item.size: |
| 604 | request.finish_write = True |
| 605 | yield request |
| 606 | request.write_offset += slice_len |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 607 | chunk = chunk[slice_len:] |
| 608 | |
aludwin | ede55f2 | 2017-07-07 11:45:33 -0700 | [diff] [blame] | 609 | response = None |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 610 | try: |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame] | 611 | response = self._proxy.call_no_retries('Write', slicer()) |
aludwin | ede55f2 | 2017-07-07 11:45:33 -0700 | [diff] [blame] | 612 | except grpc.RpcError as r: |
| 613 | if r.code() == grpc.StatusCode.ALREADY_EXISTS: |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 614 | # This is legit - we didn't check before we pushed so no problem if |
| 615 | # it's already there. |
| 616 | self._already_exists += 1 |
| 617 | if self._already_exists % 100 == 0: |
| 618 | logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( |
| 619 | self._already_exists, self._num_pushes, |
| 620 | 100.0 * self._already_exists / self._num_pushes)) |
| 621 | else: |
aludwin | ede55f2 | 2017-07-07 11:45:33 -0700 | [diff] [blame] | 622 | logging.error('gRPC error during push: throwing as IOError (%s)' % r) |
| 623 | raise IOError(r) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 624 | except Exception as e: |
| 625 | logging.error('error during push: throwing as IOError (%s)' % e) |
| 626 | raise IOError(e) |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 627 | |
aludwin | ede55f2 | 2017-07-07 11:45:33 -0700 | [diff] [blame] | 628 | if response is not None and response.committed_size != item.size: |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 629 | raise IOError('%s/%d: incorrect size written (%d)' % ( |
| 630 | item.digest, item.size, response.committed_size)) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 631 | |
| 632 | finally: |
| 633 | with self._lock: |
| 634 | self._memory_use -= item.size |
| 635 | |
| 636 | def contains(self, items): |
| 637 | """Returns the set of all missing items.""" |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 638 | # TODO(aludwin): this isn't supported directly in Bytestream, so for now |
| 639 | # assume that nothing is present in the cache. |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 640 | # Ensure all items were initialized with 'prepare' call. Storage does that. |
| 641 | assert all(i.digest is not None and i.size is not None for i in items) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 642 | # Assume all Items are missing, and attach _PushState to them. The gRPC |
| 643 | # implementation doesn't actually have a push state, we just attach empty |
| 644 | # objects to satisfy the StorageApi interface. |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 645 | missing_items = {} |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 646 | for item in items: |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 647 | missing_items[item] = _IsolateServerGrpcPushState() |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 648 | return missing_items |
| 649 | |
| 650 | |
aludwin | 758d246 | 2017-06-29 16:38:50 -0700 | [diff] [blame] | 651 | def set_grpc_proxy(proxy): |
| 652 | """Sets the StorageApi to use the specified proxy.""" |
| 653 | global _grpc_proxy |
| 654 | assert _grpc_proxy is None |
| 655 | _grpc_proxy = proxy |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 656 | |
| 657 | |
| 658 | def get_storage_api(url, namespace): |
| 659 | """Returns an object that implements low-level StorageApi interface. |
| 660 | |
| 661 | It is used by Storage to work with single isolate |namespace|. It should |
| 662 | rarely be used directly by clients, see 'get_storage' for |
| 663 | a better alternative. |
| 664 | |
| 665 | Arguments: |
| 666 | url: URL of isolate service to use shared cloud based storage. |
| 667 | namespace: isolate namespace to operate in, also defines hashing and |
| 668 | compression scheme used, i.e. namespace names that end with '-gzip' |
| 669 | store compressed data. |
| 670 | |
| 671 | Returns: |
| 672 | Instance of StorageApi subclass. |
| 673 | """ |
aludwin | 758d246 | 2017-06-29 16:38:50 -0700 | [diff] [blame] | 674 | if _grpc_proxy is not None: |
| 675 | return IsolateServerGrpc(url, namespace, _grpc_proxy) |
| 676 | return IsolateServer(url, namespace) |