aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 | # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 | # that can be found in the LICENSE file. |
| 5 | |
| 6 | """A low-level blob storage/retrieval interface to the Isolate server""" |
| 7 | |
| 8 | import base64 |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 9 | import collections |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 10 | import logging |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 11 | import os |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 12 | import re |
| 13 | import sys |
| 14 | import threading |
| 15 | import time |
| 16 | import types |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 17 | import uuid |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 18 | |
| 19 | from utils import file_path |
| 20 | from utils import net |
| 21 | |
| 22 | import isolated_format |
| 23 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 24 | try: |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame^] | 25 | import grpc # for error codes |
| 26 | from utils import grpc_proxy |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 27 | from proto import bytestream_pb2 |
| 28 | except ImportError as err: |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 29 | grpc = None |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame^] | 30 | grpc_proxy = None |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 31 | bytestream_pb2 = None |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 32 | |
| 33 | |
| 34 | # Chunk size to use when reading from network stream. |
| 35 | NET_IO_FILE_CHUNK = 16 * 1024 |
| 36 | |
| 37 | |
| 38 | # Read timeout in seconds for downloads from isolate storage. If there's no |
| 39 | # response from the server within this timeout whole download will be aborted. |
| 40 | DOWNLOAD_READ_TIMEOUT = 60 |
| 41 | |
| 42 | |
aludwin | 758d246 | 2017-06-29 16:38:50 -0700 | [diff] [blame] | 43 | # Stores the gRPC proxy address. Must be set if the storage API class is |
| 44 | # IsolateServerGrpc (call 'set_grpc_proxy'). |
| 45 | _grpc_proxy = None |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 46 | |
| 47 | |
| 48 | class Item(object): |
| 49 | """An item to push to Storage. |
| 50 | |
| 51 | Its digest and size may be provided in advance, if known. Otherwise they will |
| 52 | be derived from content(). If digest is provided, it MUST correspond to |
| 53 | hash algorithm used by Storage. |
| 54 | |
| 55 | When used with Storage, Item starts its life in a main thread, travels |
| 56 | to 'contains' thread, then to 'push' thread and then finally back to |
| 57 | the main thread. It is never used concurrently from multiple threads. |
| 58 | """ |
| 59 | |
| 60 | def __init__(self, digest=None, size=None, high_priority=False): |
| 61 | self.digest = digest |
| 62 | self.size = size |
| 63 | self.high_priority = high_priority |
| 64 | self.compression_level = 6 |
| 65 | |
| 66 | def content(self): |
| 67 | """Iterable with content of this item as byte string (str) chunks.""" |
| 68 | raise NotImplementedError() |
| 69 | |
| 70 | def prepare(self, hash_algo): |
| 71 | """Ensures self.digest and self.size are set. |
| 72 | |
| 73 | Uses content() as a source of data to calculate them. Does nothing if digest |
| 74 | and size is already known. |
| 75 | |
| 76 | Arguments: |
| 77 | hash_algo: hash algorithm to use to calculate digest. |
| 78 | """ |
| 79 | if self.digest is None or self.size is None: |
| 80 | digest = hash_algo() |
| 81 | total = 0 |
| 82 | for chunk in self.content(): |
| 83 | digest.update(chunk) |
| 84 | total += len(chunk) |
| 85 | self.digest = digest.hexdigest() |
| 86 | self.size = total |
| 87 | |
| 88 | |
| 89 | class StorageApi(object): |
| 90 | """Interface for classes that implement low-level storage operations. |
| 91 | |
| 92 | StorageApi is oblivious of compression and hashing scheme used. This details |
| 93 | are handled in higher level Storage class. |
| 94 | |
| 95 | Clients should generally not use StorageApi directly. Storage class is |
| 96 | preferred since it implements compression and upload optimizations. |
| 97 | """ |
| 98 | |
| 99 | @property |
| 100 | def location(self): |
| 101 | """URL of the backing store that this class is using.""" |
| 102 | raise NotImplementedError() |
| 103 | |
| 104 | @property |
| 105 | def namespace(self): |
| 106 | """Isolate namespace used by this storage. |
| 107 | |
| 108 | Indirectly defines hashing scheme and compression method used. |
| 109 | """ |
| 110 | raise NotImplementedError() |
| 111 | |
aludwin | ef01fa3 | 2017-06-29 12:01:03 -0700 | [diff] [blame] | 112 | @property |
| 113 | def internal_compression(self): |
| 114 | """True if this class doesn't require external compression. |
| 115 | |
| 116 | If true, callers should not compress items, even if the namespace indicates |
| 117 | otherwise. Compression will be performed by the StorageApi class. |
| 118 | """ |
| 119 | return False |
| 120 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 121 | def fetch(self, digest, offset=0): |
| 122 | """Fetches an object and yields its content. |
| 123 | |
| 124 | Arguments: |
| 125 | digest: hash digest of item to download. |
| 126 | offset: offset (in bytes) from the start of the file to resume fetch from. |
| 127 | |
| 128 | Yields: |
| 129 | Chunks of downloaded item (as str objects). |
| 130 | """ |
| 131 | raise NotImplementedError() |
| 132 | |
| 133 | def push(self, item, push_state, content=None): |
| 134 | """Uploads an |item| with content generated by |content| generator. |
| 135 | |
| 136 | |item| MUST go through 'contains' call to get |push_state| before it can |
| 137 | be pushed to the storage. |
| 138 | |
| 139 | To be clear, here is one possible usage: |
| 140 | all_items = [... all items to push as Item subclasses ...] |
| 141 | for missing_item, push_state in storage_api.contains(all_items).items(): |
| 142 | storage_api.push(missing_item, push_state) |
| 143 | |
| 144 | When pushing to a namespace with compression, data that should be pushed |
| 145 | and data provided by the item is not the same. In that case |content| is |
| 146 | not None and it yields chunks of compressed data (using item.content() as |
| 147 | a source of original uncompressed data). This is implemented by Storage |
| 148 | class. |
| 149 | |
| 150 | Arguments: |
| 151 | item: Item object that holds information about an item being pushed. |
| 152 | push_state: push state object as returned by 'contains' call. |
| 153 | content: a generator that yields chunks to push, item.content() if None. |
| 154 | |
| 155 | Returns: |
| 156 | None. |
| 157 | """ |
| 158 | raise NotImplementedError() |
| 159 | |
| 160 | def contains(self, items): |
| 161 | """Checks for |items| on the server, prepares missing ones for upload. |
| 162 | |
| 163 | Arguments: |
| 164 | items: list of Item objects to check for presence. |
| 165 | |
| 166 | Returns: |
| 167 | A dict missing Item -> opaque push state object to be passed to 'push'. |
| 168 | See doc string for 'push'. |
| 169 | """ |
| 170 | raise NotImplementedError() |
| 171 | |
| 172 | |
| 173 | class _IsolateServerPushState(object): |
| 174 | """Per-item state passed from IsolateServer.contains to IsolateServer.push. |
| 175 | |
| 176 | Note this needs to be a global class to support pickling. |
| 177 | """ |
| 178 | |
| 179 | def __init__(self, preupload_status, size): |
| 180 | self.preupload_status = preupload_status |
| 181 | gs_upload_url = preupload_status.get('gs_upload_url') or None |
| 182 | if gs_upload_url: |
| 183 | self.upload_url = gs_upload_url |
| 184 | self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload' |
| 185 | else: |
| 186 | self.upload_url = 'api/isolateservice/v1/store_inline' |
| 187 | self.finalize_url = None |
| 188 | self.uploaded = False |
| 189 | self.finalized = False |
| 190 | self.size = size |
| 191 | |
| 192 | |
| 193 | def guard_memory_use(server, content, size): |
| 194 | """Guards a server against using excessive memory while uploading. |
| 195 | |
| 196 | The server needs to contain a _memory_use int and a _lock mutex |
| 197 | (both IsolateServer and IsolateServerGrpc qualify); this function |
| 198 | then uses those values to track memory usage in a thread-safe way. |
| 199 | |
| 200 | If a request would cause the memory usage to exceed a safe maximum, |
| 201 | this function sleeps in 0.1s increments until memory usage falls |
| 202 | below the maximum. |
| 203 | """ |
| 204 | if isinstance(content, (basestring, list)): |
| 205 | # Memory is already used, too late. |
| 206 | with server._lock: |
| 207 | server._memory_use += size |
| 208 | else: |
| 209 | # TODO(vadimsh): Do not read from |content| generator when retrying push. |
| 210 | # If |content| is indeed a generator, it can not be re-winded back to the |
| 211 | # beginning of the stream. A retry will find it exhausted. A possible |
| 212 | # solution is to wrap |content| generator with some sort of caching |
| 213 | # restartable generator. It should be done alongside streaming support |
| 214 | # implementation. |
| 215 | # |
| 216 | # In theory, we should keep the generator, so that it is not serialized in |
| 217 | # memory. Sadly net.HttpService.request() requires the body to be |
| 218 | # serialized. |
| 219 | assert isinstance(content, types.GeneratorType), repr(content) |
| 220 | slept = False |
| 221 | # HACK HACK HACK. Please forgive me for my sins but OMG, it works! |
| 222 | # One byte less than 512mb. This is to cope with incompressible content. |
| 223 | max_size = int(sys.maxsize * 0.25) |
| 224 | while True: |
| 225 | with server._lock: |
| 226 | # This is due to 32 bits python when uploading very large files. The |
| 227 | # problem is that it's comparing uncompressed sizes, while we care |
| 228 | # about compressed sizes since it's what is serialized in memory. |
| 229 | # The first check assumes large files are compressible and that by |
| 230 | # throttling one upload at once, we can survive. Otherwise, kaboom. |
| 231 | memory_use = server._memory_use |
| 232 | if ((size >= max_size and not memory_use) or |
| 233 | (memory_use + size <= max_size)): |
| 234 | server._memory_use += size |
| 235 | memory_use = server._memory_use |
| 236 | break |
| 237 | time.sleep(0.1) |
| 238 | slept = True |
| 239 | if slept: |
| 240 | logging.info('Unblocked: %d %d', memory_use, size) |
| 241 | |
| 242 | |
| 243 | class IsolateServer(StorageApi): |
| 244 | """StorageApi implementation that downloads and uploads to Isolate Server. |
| 245 | |
| 246 | It uploads and downloads directly from Google Storage whenever appropriate. |
| 247 | Works only within single namespace. |
| 248 | """ |
| 249 | |
| 250 | def __init__(self, base_url, namespace): |
| 251 | super(IsolateServer, self).__init__() |
| 252 | assert file_path.is_url(base_url), base_url |
| 253 | self._base_url = base_url.rstrip('/') |
| 254 | self._namespace = namespace |
| 255 | self._namespace_dict = { |
| 256 | 'compression': 'flate' if namespace.endswith( |
| 257 | ('-gzip', '-flate')) else '', |
| 258 | 'digest_hash': 'sha-1', |
| 259 | 'namespace': namespace, |
| 260 | } |
| 261 | self._lock = threading.Lock() |
| 262 | self._server_caps = None |
| 263 | self._memory_use = 0 |
| 264 | |
| 265 | @property |
| 266 | def _server_capabilities(self): |
| 267 | """Gets server details. |
| 268 | |
| 269 | Returns: |
| 270 | Server capabilities dictionary as returned by /server_details endpoint. |
| 271 | """ |
| 272 | # TODO(maruel): Make this request much earlier asynchronously while the |
| 273 | # files are being enumerated. |
| 274 | |
| 275 | # TODO(vadimsh): Put |namespace| in the URL so that server can apply |
| 276 | # namespace-level ACLs to this call. |
| 277 | |
| 278 | with self._lock: |
| 279 | if self._server_caps is None: |
| 280 | self._server_caps = net.url_read_json( |
| 281 | url='%s/api/isolateservice/v1/server_details' % self._base_url, |
| 282 | data={}) |
| 283 | return self._server_caps |
| 284 | |
| 285 | @property |
| 286 | def location(self): |
| 287 | return self._base_url |
| 288 | |
| 289 | @property |
| 290 | def namespace(self): |
| 291 | return self._namespace |
| 292 | |
| 293 | def fetch(self, digest, offset=0): |
| 294 | assert offset >= 0 |
| 295 | source_url = '%s/api/isolateservice/v1/retrieve' % ( |
| 296 | self._base_url) |
| 297 | logging.debug('download_file(%s, %d)', source_url, offset) |
| 298 | response = self._do_fetch(source_url, digest, offset) |
| 299 | |
| 300 | if not response: |
| 301 | raise IOError( |
| 302 | 'Attempted to fetch from %s; no data exist: %s / %s.' % ( |
| 303 | source_url, self._namespace, digest)) |
| 304 | |
| 305 | # for DB uploads |
| 306 | content = response.get('content') |
| 307 | if content is not None: |
| 308 | yield base64.b64decode(content) |
| 309 | return |
| 310 | |
| 311 | # for GS entities |
| 312 | connection = net.url_open(response['url']) |
| 313 | if not connection: |
| 314 | raise IOError('Failed to download %s / %s' % (self._namespace, digest)) |
| 315 | |
| 316 | # If |offset|, verify server respects it by checking Content-Range. |
| 317 | if offset: |
| 318 | content_range = connection.get_header('Content-Range') |
| 319 | if not content_range: |
| 320 | raise IOError('Missing Content-Range header') |
| 321 | |
| 322 | # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'. |
| 323 | # According to a spec, <size> can be '*' meaning "Total size of the file |
| 324 | # is not known in advance". |
| 325 | try: |
| 326 | match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range) |
| 327 | if not match: |
| 328 | raise ValueError() |
| 329 | content_offset = int(match.group(1)) |
| 330 | last_byte_index = int(match.group(2)) |
| 331 | size = None if match.group(3) == '*' else int(match.group(3)) |
| 332 | except ValueError: |
| 333 | raise IOError('Invalid Content-Range header: %s' % content_range) |
| 334 | |
| 335 | # Ensure returned offset equals requested one. |
| 336 | if offset != content_offset: |
| 337 | raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % ( |
| 338 | offset, content_offset, content_range)) |
| 339 | |
| 340 | # Ensure entire tail of the file is returned. |
| 341 | if size is not None and last_byte_index + 1 != size: |
| 342 | raise IOError('Incomplete response. Content-Range: %s' % content_range) |
| 343 | |
| 344 | for data in connection.iter_content(NET_IO_FILE_CHUNK): |
| 345 | yield data |
| 346 | |
| 347 | def push(self, item, push_state, content=None): |
| 348 | assert isinstance(item, Item) |
| 349 | assert item.digest is not None |
| 350 | assert item.size is not None |
| 351 | assert isinstance(push_state, _IsolateServerPushState) |
| 352 | assert not push_state.finalized |
| 353 | |
| 354 | # Default to item.content(). |
| 355 | content = item.content() if content is None else content |
| 356 | logging.info('Push state size: %d', push_state.size) |
| 357 | guard_memory_use(self, content, push_state.size) |
| 358 | |
| 359 | try: |
| 360 | # This push operation may be a retry after failed finalization call below, |
| 361 | # no need to reupload contents in that case. |
| 362 | if not push_state.uploaded: |
| 363 | # PUT file to |upload_url|. |
| 364 | success = self._do_push(push_state, content) |
| 365 | if not success: |
| 366 | raise IOError('Failed to upload file with hash %s to URL %s' % ( |
| 367 | item.digest, push_state.upload_url)) |
| 368 | push_state.uploaded = True |
| 369 | else: |
| 370 | logging.info( |
| 371 | 'A file %s already uploaded, retrying finalization only', |
| 372 | item.digest) |
| 373 | |
| 374 | # Optionally notify the server that it's done. |
| 375 | if push_state.finalize_url: |
| 376 | # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and |
| 377 | # send it to isolated server. That way isolate server can verify that |
| 378 | # the data safely reached Google Storage (GS provides MD5 and CRC32C of |
| 379 | # stored files). |
| 380 | # TODO(maruel): Fix the server to accept properly data={} so |
| 381 | # url_read_json() can be used. |
| 382 | response = net.url_read_json( |
| 383 | url='%s/%s' % (self._base_url, push_state.finalize_url), |
| 384 | data={ |
| 385 | 'upload_ticket': push_state.preupload_status['upload_ticket'], |
| 386 | }) |
| 387 | if not response or not response['ok']: |
| 388 | raise IOError('Failed to finalize file with hash %s.' % item.digest) |
| 389 | push_state.finalized = True |
| 390 | finally: |
| 391 | with self._lock: |
| 392 | self._memory_use -= push_state.size |
| 393 | |
| 394 | def contains(self, items): |
| 395 | # Ensure all items were initialized with 'prepare' call. Storage does that. |
| 396 | assert all(i.digest is not None and i.size is not None for i in items) |
| 397 | |
| 398 | # Request body is a json encoded list of dicts. |
| 399 | body = { |
| 400 | 'items': [ |
| 401 | { |
| 402 | 'digest': item.digest, |
| 403 | 'is_isolated': bool(item.high_priority), |
| 404 | 'size': item.size, |
| 405 | } for item in items |
| 406 | ], |
| 407 | 'namespace': self._namespace_dict, |
| 408 | } |
| 409 | |
| 410 | query_url = '%s/api/isolateservice/v1/preupload' % self._base_url |
| 411 | |
| 412 | # Response body is a list of push_urls (or null if file is already present). |
| 413 | response = None |
| 414 | try: |
| 415 | response = net.url_read_json(url=query_url, data=body) |
| 416 | if response is None: |
| 417 | raise isolated_format.MappingError( |
| 418 | 'Failed to execute preupload query') |
| 419 | except ValueError as err: |
| 420 | raise isolated_format.MappingError( |
| 421 | 'Invalid response from server: %s, body is %s' % (err, response)) |
| 422 | |
| 423 | # Pick Items that are missing, attach _PushState to them. |
| 424 | missing_items = {} |
| 425 | for preupload_status in response.get('items', []): |
| 426 | assert 'upload_ticket' in preupload_status, ( |
| 427 | preupload_status, '/preupload did not generate an upload ticket') |
| 428 | index = int(preupload_status['index']) |
| 429 | missing_items[items[index]] = _IsolateServerPushState( |
| 430 | preupload_status, items[index].size) |
| 431 | logging.info('Queried %d files, %d cache hit', |
| 432 | len(items), len(items) - len(missing_items)) |
| 433 | return missing_items |
| 434 | |
| 435 | def _do_fetch(self, url, digest, offset): |
| 436 | """Fetches isolated data from the URL. |
| 437 | |
| 438 | Used only for fetching files, not for API calls. Can be overridden in |
| 439 | subclasses. |
| 440 | |
| 441 | Args: |
| 442 | url: URL to fetch the data from, can possibly return http redirect. |
| 443 | offset: byte offset inside the file to start fetching from. |
| 444 | |
| 445 | Returns: |
| 446 | net.HttpResponse compatible object, with 'read' and 'get_header' calls. |
| 447 | """ |
| 448 | assert isinstance(offset, int) |
| 449 | data = { |
| 450 | 'digest': digest.encode('utf-8'), |
| 451 | 'namespace': self._namespace_dict, |
| 452 | 'offset': offset, |
| 453 | } |
| 454 | # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint |
| 455 | # is added. |
| 456 | return net.url_read_json( |
| 457 | url=url, |
| 458 | data=data, |
| 459 | read_timeout=DOWNLOAD_READ_TIMEOUT) |
| 460 | |
| 461 | def _do_push(self, push_state, content): |
| 462 | """Uploads isolated file to the URL. |
| 463 | |
| 464 | Used only for storing files, not for API calls. Can be overridden in |
| 465 | subclasses. |
| 466 | |
| 467 | Args: |
| 468 | url: URL to upload the data to. |
| 469 | push_state: an _IsolateServicePushState instance |
| 470 | item: the original Item to be uploaded |
| 471 | content: an iterable that yields 'str' chunks. |
| 472 | """ |
| 473 | # A cheezy way to avoid memcpy of (possibly huge) file, until streaming |
| 474 | # upload support is implemented. |
| 475 | if isinstance(content, list) and len(content) == 1: |
| 476 | content = content[0] |
| 477 | else: |
| 478 | content = ''.join(content) |
| 479 | |
| 480 | # DB upload |
| 481 | if not push_state.finalize_url: |
| 482 | url = '%s/%s' % (self._base_url, push_state.upload_url) |
| 483 | content = base64.b64encode(content) |
| 484 | data = { |
| 485 | 'upload_ticket': push_state.preupload_status['upload_ticket'], |
| 486 | 'content': content, |
| 487 | } |
| 488 | response = net.url_read_json(url=url, data=data) |
| 489 | return response is not None and response['ok'] |
| 490 | |
| 491 | # upload to GS |
| 492 | url = push_state.upload_url |
| 493 | response = net.url_read( |
| 494 | content_type='application/octet-stream', |
| 495 | data=content, |
| 496 | method='PUT', |
| 497 | headers={'Cache-Control': 'public, max-age=31536000'}, |
| 498 | url=url) |
| 499 | return response is not None |
| 500 | |
| 501 | |
| 502 | class _IsolateServerGrpcPushState(object): |
| 503 | """Empty class, just to present same interface as IsolateServer """ |
| 504 | |
| 505 | def __init__(self): |
| 506 | pass |
| 507 | |
| 508 | |
| 509 | class IsolateServerGrpc(StorageApi): |
| 510 | """StorageApi implementation that downloads and uploads to a gRPC service. |
| 511 | |
| 512 | Limitations: only works for the default-gzip namespace, and with zero offsets |
| 513 | while fetching. |
| 514 | """ |
| 515 | |
aludwin | 758d246 | 2017-06-29 16:38:50 -0700 | [diff] [blame] | 516 | def __init__(self, server, namespace, proxy): |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 517 | super(IsolateServerGrpc, self).__init__() |
| 518 | logging.info('Using gRPC for Isolate') |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame^] | 519 | # Proxies only support the default-gzip namespace for now. |
| 520 | # TODO(aludwin): support other namespaces if necessary |
| 521 | assert namespace == 'default-gzip' |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 522 | self._server = server |
| 523 | self._lock = threading.Lock() |
| 524 | self._memory_use = 0 |
| 525 | self._num_pushes = 0 |
| 526 | self._already_exists = 0 |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame^] | 527 | self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub) |
aludwin | ef01fa3 | 2017-06-29 12:01:03 -0700 | [diff] [blame] | 528 | self._namespace = namespace |
| 529 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 530 | |
| 531 | @property |
| 532 | def location(self): |
| 533 | return self._server |
| 534 | |
| 535 | @property |
| 536 | def namespace(self): |
aludwin | ef01fa3 | 2017-06-29 12:01:03 -0700 | [diff] [blame] | 537 | return self._namespace |
| 538 | |
| 539 | @property |
| 540 | def internal_compression(self): |
| 541 | # gRPC natively compresses all messages before transmission. |
| 542 | return True |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 543 | |
| 544 | def fetch(self, digest, offset=0): |
| 545 | # The gRPC APIs only work with an offset of 0 |
| 546 | assert offset == 0 |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 547 | request = bytestream_pb2.ReadRequest() |
| 548 | #TODO(aludwin): send the expected size of the item |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame^] | 549 | request.resource_name = '%s/blobs/%s/0' % ( |
| 550 | self._proxy.prefix, digest) |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 551 | try: |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame^] | 552 | for response in self._proxy.get_stream('Read', request): |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 553 | yield response.data |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 554 | except grpc.RpcError as g: |
| 555 | logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
| 556 | raise IOError(g) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 557 | |
| 558 | def push(self, item, push_state, content=None): |
| 559 | assert isinstance(item, Item) |
| 560 | assert item.digest is not None |
| 561 | assert item.size is not None |
| 562 | assert isinstance(push_state, _IsolateServerGrpcPushState) |
| 563 | |
| 564 | # Default to item.content(). |
| 565 | content = item.content() if content is None else content |
| 566 | guard_memory_use(self, content, item.size) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 567 | self._num_pushes += 1 |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 568 | |
| 569 | try: |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 570 | def chunker(): |
| 571 | # Returns one bit of content at a time |
aludwin | d1e5913 | 2016-12-14 08:22:41 -0800 | [diff] [blame] | 572 | if (isinstance(content, str) |
| 573 | or not isinstance(content, collections.Iterable)): |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 574 | yield content |
| 575 | else: |
| 576 | for chunk in content: |
| 577 | yield chunk |
| 578 | def slicer(): |
| 579 | # Ensures every bit of content is under the gRPC max size; yields |
| 580 | # proto messages to send via gRPC. |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 581 | request = bytestream_pb2.WriteRequest() |
| 582 | u = uuid.uuid4() |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame^] | 583 | request.resource_name = '%s/uploads/%s/blobs/%s/%d' % ( |
| 584 | self._proxy.prefix, u, item.digest, item.size) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 585 | request.write_offset = 0 |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 586 | for chunk in chunker(): |
| 587 | # Make sure we send at least one chunk for zero-length blobs |
| 588 | has_sent_anything = False |
aludwin | d1e5913 | 2016-12-14 08:22:41 -0800 | [diff] [blame] | 589 | while chunk or not has_sent_anything: |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 590 | has_sent_anything = True |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 591 | slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
| 592 | request.data = chunk[:slice_len] |
| 593 | if request.write_offset + slice_len == item.size: |
| 594 | request.finish_write = True |
| 595 | yield request |
| 596 | request.write_offset += slice_len |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 597 | chunk = chunk[slice_len:] |
| 598 | |
aludwin | ede55f2 | 2017-07-07 11:45:33 -0700 | [diff] [blame] | 599 | response = None |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 600 | try: |
aludwin | d5a6788 | 2017-08-04 12:58:10 -0700 | [diff] [blame^] | 601 | response = self._proxy.call_no_retries('Write', slicer()) |
aludwin | ede55f2 | 2017-07-07 11:45:33 -0700 | [diff] [blame] | 602 | except grpc.RpcError as r: |
| 603 | if r.code() == grpc.StatusCode.ALREADY_EXISTS: |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 604 | # This is legit - we didn't check before we pushed so no problem if |
| 605 | # it's already there. |
| 606 | self._already_exists += 1 |
| 607 | if self._already_exists % 100 == 0: |
| 608 | logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( |
| 609 | self._already_exists, self._num_pushes, |
| 610 | 100.0 * self._already_exists / self._num_pushes)) |
| 611 | else: |
aludwin | ede55f2 | 2017-07-07 11:45:33 -0700 | [diff] [blame] | 612 | logging.error('gRPC error during push: throwing as IOError (%s)' % r) |
| 613 | raise IOError(r) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 614 | except Exception as e: |
| 615 | logging.error('error during push: throwing as IOError (%s)' % e) |
| 616 | raise IOError(e) |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 617 | |
aludwin | ede55f2 | 2017-07-07 11:45:33 -0700 | [diff] [blame] | 618 | if response is not None and response.committed_size != item.size: |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 619 | raise IOError('%s/%d: incorrect size written (%d)' % ( |
| 620 | item.digest, item.size, response.committed_size)) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 621 | |
| 622 | finally: |
| 623 | with self._lock: |
| 624 | self._memory_use -= item.size |
| 625 | |
| 626 | def contains(self, items): |
| 627 | """Returns the set of all missing items.""" |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 628 | # TODO(aludwin): this isn't supported directly in Bytestream, so for now |
| 629 | # assume that nothing is present in the cache. |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 630 | # Ensure all items were initialized with 'prepare' call. Storage does that. |
| 631 | assert all(i.digest is not None and i.size is not None for i in items) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 632 | # Assume all Items are missing, and attach _PushState to them. The gRPC |
| 633 | # implementation doesn't actually have a push state, we just attach empty |
| 634 | # objects to satisfy the StorageApi interface. |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 635 | missing_items = {} |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 636 | for item in items: |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 637 | missing_items[item] = _IsolateServerGrpcPushState() |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 638 | return missing_items |
| 639 | |
| 640 | |
aludwin | 758d246 | 2017-06-29 16:38:50 -0700 | [diff] [blame] | 641 | def set_grpc_proxy(proxy): |
| 642 | """Sets the StorageApi to use the specified proxy.""" |
| 643 | global _grpc_proxy |
| 644 | assert _grpc_proxy is None |
| 645 | _grpc_proxy = proxy |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 646 | |
| 647 | |
| 648 | def get_storage_api(url, namespace): |
| 649 | """Returns an object that implements low-level StorageApi interface. |
| 650 | |
| 651 | It is used by Storage to work with single isolate |namespace|. It should |
| 652 | rarely be used directly by clients, see 'get_storage' for |
| 653 | a better alternative. |
| 654 | |
| 655 | Arguments: |
| 656 | url: URL of isolate service to use shared cloud based storage. |
| 657 | namespace: isolate namespace to operate in, also defines hashing and |
| 658 | compression scheme used, i.e. namespace names that end with '-gzip' |
| 659 | store compressed data. |
| 660 | |
| 661 | Returns: |
| 662 | Instance of StorageApi subclass. |
| 663 | """ |
aludwin | 758d246 | 2017-06-29 16:38:50 -0700 | [diff] [blame] | 664 | if _grpc_proxy is not None: |
| 665 | return IsolateServerGrpc(url, namespace, _grpc_proxy) |
| 666 | return IsolateServer(url, namespace) |